This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new ce65f4a [SPARK-52071] Add `stream` example and revise `Trigger` names
ce65f4a is described below
commit ce65f4af5cfb5f079a1ba2ed99980030e2fb3cff
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun May 11 14:05:15 2025 -0700
[SPARK-52071] Add `stream` example and revise `Trigger` names
### What changes were proposed in this pull request?
This PR aims to add `stream` example and revise `Trigger` names more
consistently with the Scala version.
### Why are the changes needed?
To give an illustrative example in `Swift` by porting
`StructuredNetworkWordCount.scala`.
-
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
### Does this PR introduce _any_ user-facing change?
No. This is a change on the unreleased feature.
### How was this patch tested?
Manual review.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #128 from dongjoon-hyun/SPARK-52071.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Examples/stream/Package.swift | 37 +++++++++++++++
Examples/stream/README.md | 68 +++++++++++++++++++++++++++
Examples/stream/Sources/main.swift | 49 +++++++++++++++++++
README.md | 2 +-
Sources/SparkConnect/DataStreamWriter.swift | 27 +++++------
Tests/SparkConnectTests/DataStreamTests.swift | 2 +-
6 files changed, 169 insertions(+), 16 deletions(-)
diff --git a/Examples/stream/Package.swift b/Examples/stream/Package.swift
new file mode 100644
index 0000000..8c8ef7a
--- /dev/null
+++ b/Examples/stream/Package.swift
@@ -0,0 +1,37 @@
+// swift-tools-version: 6.0
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import PackageDescription
+
+let package = Package(
+ name: "SparkConnectSwiftNetworkWordCount",
+ platforms: [
+ .macOS(.v15)
+ ],
+ dependencies: [
+ .package(url: "https://github.com/apache/spark-connect-swift.git", branch:
"main")
+ ],
+ targets: [
+ .executableTarget(
+ name: "SparkConnectSwiftNetworkWordCount",
+ dependencies: [.product(name: "SparkConnect", package:
"spark-connect-swift")]
+ )
+ ]
+)
diff --git a/Examples/stream/README.md b/Examples/stream/README.md
new file mode 100644
index 0000000..ee8553e
--- /dev/null
+++ b/Examples/stream/README.md
@@ -0,0 +1,68 @@
+# A Swift Network Word Count Application with Apache Spark Connect Swift Client
+
+This is an example Swift stream processing application to show how to count
words with Apache Spark Connect Swift Client library.
+
+## Run `Spark Connect Server`
+
+```bash
+./sbin/start-connect-server.sh --wait -c spark.log.level=ERROR
+```
+
+## Run `Netcat` as a streaming input server
+
+You will first need to run Netcat (a small utility found in most Unix-like
systems) as a data server by using
+
+```bash
+nc -lk 9999
+```
+
+## Start streaming processing application
+
+```bash
+$ swift run
+...
+Connected to Apache Spark 4.0.0 Server
+```
+
+## Send input and check output
+
+Then, any lines typed in the terminal running the `Netcat` server will be
counted and printed on screen every second.
+
+```bash
+$ nc -lk 9999
+apache spark
+apache hadoop
+```
+
+ `Spark Connect Server` output will look something like the following.
+
+```bash
+-------------------------------------------
+Batch: 0
+-------------------------------------------
++----+--------+
+|word|count(1)|
++----+--------+
++----+--------+
+
+-------------------------------------------
+Batch: 1
+-------------------------------------------
++------+--------+
+| word|count(1)|
++------+--------+
+|apache| 1|
+| spark| 1|
++------+--------+
+
+-------------------------------------------
+Batch: 2
+-------------------------------------------
++------+--------+
+| word|count(1)|
++------+--------+
+|apache| 2|
+| spark| 1|
+|hadoop| 1|
++------+--------+
+```
diff --git a/Examples/stream/Sources/main.swift
b/Examples/stream/Sources/main.swift
new file mode 100644
index 0000000..2ce9a40
--- /dev/null
+++ b/Examples/stream/Sources/main.swift
@@ -0,0 +1,49 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+import SparkConnect
+
+let spark = try await SparkSession.builder.getOrCreate()
+print("Connected to Apache Spark \(await spark.version) Server")
+
+let lines =
+ await spark
+ .readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", "9999")
+ .load()
+
+let word =
+ await lines
+ .selectExpr("explode(split(value, ' ')) as word")
+
+let wordCounts =
+ await word
+ .groupBy("word")
+ .agg("count(*)")
+
+let query =
+ try await wordCounts
+ .writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
+
+_ = try await query.awaitTermination()
diff --git a/README.md b/README.md
index 395a6a0..2049f8e 100644
--- a/README.md
+++ b/README.md
@@ -108,7 +108,7 @@ SELECT * FROM t
+----+
```
-You can find more complete examples including Web Server application in the
`Examples` directory.
+You can find more complete examples including Web Server and Streaming
applications in the `Examples` directory.
## How to use `Spark SQL REPL` via `Spark Connect for Swift`
diff --git a/Sources/SparkConnect/DataStreamWriter.swift
b/Sources/SparkConnect/DataStreamWriter.swift
index 0af04ff..9822899 100644
--- a/Sources/SparkConnect/DataStreamWriter.swift
+++ b/Sources/SparkConnect/DataStreamWriter.swift
@@ -19,10 +19,10 @@
import Foundation
public enum Trigger {
- case OneTimeTrigger
- case AvailableNowTrigger
- case ProcessingTimeTrigger(intervalMs: Int64)
- case ContinuousTrigger(intervalMs: Int64)
+ case OneTime
+ case AvailableNow
+ case ProcessingTime(_ intervalMs: Int64)
+ case Continuous(_ intervalMs: Int64)
}
/// An actor used to write a streaming `DataFrame` to external storage systems
@@ -32,7 +32,7 @@ public actor DataStreamWriter: Sendable {
var source: String? = nil
- var trigger: Trigger? = nil
+ var trigger: Trigger = Trigger.ProcessingTime(0)
var path: String? = nil
@@ -155,15 +155,14 @@ public actor DataStreamWriter: Sendable {
}
writeStreamOperationStart.trigger =
switch self.trigger {
- case .ProcessingTimeTrigger(let intervalMs):
- .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
- case .OneTimeTrigger:
- .once(true)
- case .AvailableNowTrigger:
- .availableNow(true)
- case .ContinuousTrigger(let intervalMs):
- .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
- default: .once(true)
+ case .ProcessingTime(let intervalMs):
+ .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
+ case .OneTime:
+ .once(true)
+ case .AvailableNow:
+ .availableNow(true)
+ case .Continuous(let intervalMs):
+ .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
}
if let outputMode = self.outputMode {
writeStreamOperationStart.outputMode = outputMode
diff --git a/Tests/SparkConnectTests/DataStreamTests.swift
b/Tests/SparkConnectTests/DataStreamTests.swift
index 00af6ae..73fbdf4 100644
--- a/Tests/SparkConnectTests/DataStreamTests.swift
+++ b/Tests/SparkConnectTests/DataStreamTests.swift
@@ -52,7 +52,7 @@ struct DataStreamTests {
.option("checkpointLocation", checkpoint)
.outputMode("append")
.format("orc")
- .trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000))
+ .trigger(Trigger.ProcessingTime(1000))
.start(output)
#expect(try await query.isActive)
// Wait for processing
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]