This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new ce65f4a  [SPARK-52071] Add `stream` example and revise `Trigger` names
ce65f4a is described below

commit ce65f4af5cfb5f079a1ba2ed99980030e2fb3cff
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun May 11 14:05:15 2025 -0700

    [SPARK-52071] Add `stream` example and revise `Trigger` names
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `stream` example and revise `Trigger` names more 
consistently with the Scala version.
    
    ### Why are the changes needed?
    
    To give an illustrative example in `Swift` by porting 
`StructuredNetworkWordCount.scala`.
    - 
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a change on the unreleased feature.
    
    ### How was this patch tested?
    
    Manual review.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #128 from dongjoon-hyun/SPARK-52071.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Examples/stream/Package.swift                 | 37 +++++++++++++++
 Examples/stream/README.md                     | 68 +++++++++++++++++++++++++++
 Examples/stream/Sources/main.swift            | 49 +++++++++++++++++++
 README.md                                     |  2 +-
 Sources/SparkConnect/DataStreamWriter.swift   | 27 +++++------
 Tests/SparkConnectTests/DataStreamTests.swift |  2 +-
 6 files changed, 169 insertions(+), 16 deletions(-)

diff --git a/Examples/stream/Package.swift b/Examples/stream/Package.swift
new file mode 100644
index 0000000..8c8ef7a
--- /dev/null
+++ b/Examples/stream/Package.swift
@@ -0,0 +1,37 @@
+// swift-tools-version: 6.0
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import PackageDescription
+
+let package = Package(
+  name: "SparkConnectSwiftNetworkWordCount",
+  platforms: [
+    .macOS(.v15)
+  ],
+  dependencies: [
+    .package(url: "https://github.com/apache/spark-connect-swift.git";, branch: 
"main")
+  ],
+  targets: [
+    .executableTarget(
+      name: "SparkConnectSwiftNetworkWordCount",
+      dependencies: [.product(name: "SparkConnect", package: 
"spark-connect-swift")]
+    )
+  ]
+)
diff --git a/Examples/stream/README.md b/Examples/stream/README.md
new file mode 100644
index 0000000..ee8553e
--- /dev/null
+++ b/Examples/stream/README.md
@@ -0,0 +1,68 @@
+# A Swift Network Word Count Application with Apache Spark Connect Swift Client
+
+This is an example Swift stream processing application to show how to count 
words with Apache Spark Connect Swift Client library.
+
+## Run `Spark Connect Server`
+
+```bash
+./sbin/start-connect-server.sh --wait -c spark.log.level=ERROR
+```
+
+## Run `Netcat` as a streaming input server
+
+You will first need to run Netcat (a small utility found in most Unix-like 
systems) as a data server by using
+
+```bash
+nc -lk 9999
+```
+
+## Start streaming processing application
+
+```bash
+$ swift run
+...
+Connected to Apache Spark 4.0.0 Server
+```
+
+## Send input and check output
+
+Then, any lines typed in the terminal running the `Netcat` server will be 
counted and printed on screen every second.
+
+```bash
+$ nc -lk 9999
+apache spark
+apache hadoop
+```
+
+ `Spark Connect Server` output will look something like the following.
+
+```bash
+-------------------------------------------
+Batch: 0
+-------------------------------------------
++----+--------+
+|word|count(1)|
++----+--------+
++----+--------+
+
+-------------------------------------------
+Batch: 1
+-------------------------------------------
++------+--------+
+|  word|count(1)|
++------+--------+
+|apache|       1|
+| spark|       1|
++------+--------+
+
+-------------------------------------------
+Batch: 2
+-------------------------------------------
++------+--------+
+|  word|count(1)|
++------+--------+
+|apache|       2|
+| spark|       1|
+|hadoop|       1|
++------+--------+
+```
diff --git a/Examples/stream/Sources/main.swift 
b/Examples/stream/Sources/main.swift
new file mode 100644
index 0000000..2ce9a40
--- /dev/null
+++ b/Examples/stream/Sources/main.swift
@@ -0,0 +1,49 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import SparkConnect
+
+let spark = try await SparkSession.builder.getOrCreate()
+print("Connected to Apache Spark \(await spark.version) Server")
+
+let lines =
+  await spark
+  .readStream
+  .format("socket")
+  .option("host", "localhost")
+  .option("port", "9999")
+  .load()
+
+let word =
+  await lines
+  .selectExpr("explode(split(value, ' ')) as word")
+
+let wordCounts =
+  await word
+  .groupBy("word")
+  .agg("count(*)")
+
+let query =
+  try await wordCounts
+  .writeStream
+  .outputMode("complete")
+  .format("console")
+  .start()
+
+_ = try await query.awaitTermination()
diff --git a/README.md b/README.md
index 395a6a0..2049f8e 100644
--- a/README.md
+++ b/README.md
@@ -108,7 +108,7 @@ SELECT * FROM t
 +----+
 ```
 
-You can find more complete examples including Web Server application in the 
`Examples` directory.
+You can find more complete examples including Web Server and Streaming 
applications in the `Examples` directory.
 
 ## How to use `Spark SQL REPL` via `Spark Connect for Swift`
 
diff --git a/Sources/SparkConnect/DataStreamWriter.swift 
b/Sources/SparkConnect/DataStreamWriter.swift
index 0af04ff..9822899 100644
--- a/Sources/SparkConnect/DataStreamWriter.swift
+++ b/Sources/SparkConnect/DataStreamWriter.swift
@@ -19,10 +19,10 @@
 import Foundation
 
 public enum Trigger {
-  case OneTimeTrigger
-  case AvailableNowTrigger
-  case ProcessingTimeTrigger(intervalMs: Int64)
-  case ContinuousTrigger(intervalMs: Int64)
+  case OneTime
+  case AvailableNow
+  case ProcessingTime(_ intervalMs: Int64)
+  case Continuous(_ intervalMs: Int64)
 }
 
 /// An actor used to write a streaming `DataFrame` to external storage systems
@@ -32,7 +32,7 @@ public actor DataStreamWriter: Sendable {
 
   var source: String? = nil
 
-  var trigger: Trigger? = nil
+  var trigger: Trigger = Trigger.ProcessingTime(0)
 
   var path: String? = nil
 
@@ -155,15 +155,14 @@ public actor DataStreamWriter: Sendable {
     }
     writeStreamOperationStart.trigger =
       switch self.trigger {
-      case .ProcessingTimeTrigger(let intervalMs):
-          .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
-      case .OneTimeTrigger:
-          .once(true)
-      case .AvailableNowTrigger:
-          .availableNow(true)
-      case .ContinuousTrigger(let intervalMs):
-          .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
-      default: .once(true)
+      case .ProcessingTime(let intervalMs):
+        .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
+      case .OneTime:
+        .once(true)
+      case .AvailableNow:
+        .availableNow(true)
+      case .Continuous(let intervalMs):
+        .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
       }
     if let outputMode = self.outputMode {
       writeStreamOperationStart.outputMode = outputMode
diff --git a/Tests/SparkConnectTests/DataStreamTests.swift 
b/Tests/SparkConnectTests/DataStreamTests.swift
index 00af6ae..73fbdf4 100644
--- a/Tests/SparkConnectTests/DataStreamTests.swift
+++ b/Tests/SparkConnectTests/DataStreamTests.swift
@@ -52,7 +52,7 @@ struct DataStreamTests {
       .option("checkpointLocation", checkpoint)
       .outputMode("append")
       .format("orc")
-      .trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000))
+      .trigger(Trigger.ProcessingTime(1000))
       .start(output)
     #expect(try await query.isActive)
     // Wait for processing


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to