This is an automated email from the ASF dual-hosted git repository.
mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new b05a602 [MINOR] Enhance the quick start example and spark session
example
b05a602 is described below
commit b05a60228c217f40f37e9f0b78ddf69f36e0da6e
Author: Xin Hao <[email protected]>
AuthorDate: Fri Aug 23 14:24:38 2024 +0200
[MINOR] Enhance the quick start example and spark session example
### What changes were proposed in this pull request?
Enhance the quick start example and spark session example
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes #54 from haoxins/enhance-the-start.
Lead-authored-by: Xin Hao <[email protected]>
Co-authored-by: Martin Grund <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
---
cmd/spark-connect-example-spark-session/main.go | 24 +++++++++++++++---------
quick-start.md | 10 +++++++---
spark/sql/dataframe.go | 4 +++-
3 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/cmd/spark-connect-example-spark-session/main.go
b/cmd/spark-connect-example-spark-session/main.go
index 71f6f07..163009f 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -19,6 +19,7 @@ package main
import (
"context"
"flag"
+ "fmt"
"log"
"github.com/apache/spark-connect-go/v35/spark/sql/functions"
@@ -27,8 +28,13 @@ import (
"github.com/apache/spark-connect-go/v35/spark/sql/utils"
)
-var remote = flag.String("remote", "sc://localhost:15002",
- "the remote address of Spark Connect server to connect to")
+var (
+ remote = flag.String("remote", "sc://localhost:15002",
+ "the remote address of Spark Connect server to connect to")
+
+ filedir = flag.String("filedir", "/tmp",
+ "the root directory to save the files generated by this example
program")
+)
func main() {
flag.Parse()
@@ -39,7 +45,7 @@ func main() {
}
defer utils.WarnOnError(spark.Stop, func(err error) {})
- df, err := spark.Sql(ctx, "select id2 from range(100)")
+ df, err := spark.Sql(ctx, "select id from range(100)")
if err != nil {
log.Fatalf("Failed: %s", err)
}
@@ -111,13 +117,13 @@ func main() {
err = df.Writer().Mode("overwrite").
Format("parquet").
- Save(ctx,
"file:///tmp/spark-connect-write-example-output.parquet")
+ Save(ctx,
fmt.Sprintf("file://%s/spark-connect-write-example-output.parquet", *filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
df, err = spark.Read().Format("parquet").
- Load("file:///tmp/spark-connect-write-example-output.parquet")
+
Load(fmt.Sprintf("file://%s/spark-connect-write-example-output.parquet",
*filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
@@ -152,7 +158,7 @@ func main() {
err = df.Writer().Mode("overwrite").
Format("parquet").
- Save(ctx,
"file:///tmp/spark-connect-write-example-output-one-partition.parquet")
+ Save(ctx,
fmt.Sprintf("file://%s/spark-connect-write-example-output-one-partition.parquet",
*filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
@@ -165,7 +171,7 @@ func main() {
err = df.Writer().Mode("overwrite").
Format("parquet").
- Save(ctx,
"file:///tmp/spark-connect-write-example-output-two-partition.parquet")
+ Save(ctx,
fmt.Sprintf("file://%s/spark-connect-write-example-output-two-partition.parquet",
*filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
@@ -178,7 +184,7 @@ func main() {
err = df.Writer().Mode("overwrite").
Format("parquet").
- Save(ctx,
"file:///tmp/spark-connect-write-example-output-repartition-with-column.parquet")
+ Save(ctx,
fmt.Sprintf("file://%s/spark-connect-write-example-output-repartition-with-column.parquet",
*filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
@@ -196,7 +202,7 @@ func main() {
err = df.Writer().Mode("overwrite").
Format("parquet").
- Save(ctx,
"file:///tmp/spark-connect-write-example-output-repartition-by-range-with-column.parquet")
+ Save(ctx,
fmt.Sprintf("file:///%s/spark-connect-write-example-output-repartition-by-range-with-column.parquet",
*filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
diff --git a/quick-start.md b/quick-start.md
index 4ab28f4..0b3a514 100644
--- a/quick-start.md
+++ b/quick-start.md
@@ -20,6 +20,7 @@ package main
import (
"context"
"flag"
+ "fmt"
"log"
"github.com/apache/spark-connect-go/v35/spark/sql"
@@ -28,6 +29,9 @@ import (
var (
remote = flag.String("remote", "sc://localhost:15002",
"the remote address of Spark Connect server to connect to")
+
+ filedir = flag.String("filedir", "/tmp",
+ "the directory to save the files")
)
func main() {
@@ -79,13 +83,13 @@ func main() {
err = df.Writer().Mode("overwrite").
Format("parquet").
- Save(ctx,
"file:///tmp/spark-connect-write-example-output.parquet")
+ Save(ctx,
fmt.Sprintf("file://%s/spark-connect-write-example-output.parquet", *filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
df, err = spark.Read().Format("parquet").
- Load("file:///tmp/spark-connect-write-example-output.parquet")
+
Load(fmt.Sprintf("file://%s/spark-connect-write-example-output.parquet",
*filedir))
if err != nil {
log.Fatalf("Failed: %s", err)
}
@@ -120,7 +124,7 @@ sbin/start-connect-server.sh --packages
org.apache.spark:spark-connect_2.12:3.5.
## Run Spark Connect Client Application
```
-go run main.go
+go run main.go --filedir YOUR_TMP_DIR
```
You will see the client application connects to the Spark Connect server and
prints out the output from your application.
diff --git a/spark/sql/dataframe.go b/spark/sql/dataframe.go
index 187fd2b..cd8ba64 100644
--- a/spark/sql/dataframe.go
+++ b/spark/sql/dataframe.go
@@ -56,9 +56,11 @@ type DataFrame interface {
Repartition(numPartitions int, columns []string) (DataFrame, error)
// RepartitionByRange re-partitions a data frame by range partition.
RepartitionByRange(numPartitions int, columns []RangePartitionColumn)
(DataFrame, error)
-
+ // Filter filters the data frame by a column condition.
Filter(condition column.Column) (DataFrame, error)
+ // FilterByString filters the data frame by a string condition.
FilterByString(condition string) (DataFrame, error)
+ // Col returns a column by name.
Col(name string) (column.Column, error)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]