This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6eed08cb0c12 [SPARK-47199][PYTHON][TESTS] Add prefix into
TemporaryDirectory to avoid flakiness
6eed08cb0c12 is described below
commit 6eed08cb0c12c46b9de4665ab130cea1695b9a5b
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Feb 27 23:11:25 2024 -0800
[SPARK-47199][PYTHON][TESTS] Add prefix into TemporaryDirectory to avoid
flakiness
### What changes were proposed in this pull request?
This PR proposes to set `prefix` for `TemporaryDirectory` to deflake the
tests. Sometimes the test fail because the temporary directory names are same
(https://github.com/apache/spark/actions/runs/8066850485/job/22036007390).
```
File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in
pyspark.sql.dataframe.DataFrame.writeStream
Failed example:
with tempfile.TemporaryDirectory() as d:
# Create a table with Rate source.
df.writeStream.toTable(
"my_table", checkpointLocation=d)
Exception raised:
Traceback (most recent call last):
File "/usr/lib/python3.11/doctest.py", line 1353, in __run
exec(compile(example.source, filename, "single",
File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line
1, in <module>
with tempfile.TemporaryDirectory() as d:
File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
self.cleanup()
File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
_rmtree(name, onerror=onerror)
File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
onerror(os.rmdir, path, sys.exc_info())
File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
os.rmdir(path, dir_fd=dir_fd)
OSError: [Errno 39] Directory not empty:
'/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
```
### Why are the changes needed?
To make the tests more robust.
### Does this PR introduce _any_ user-facing change?
No, test-only. There's a bit of user-facing documentation change but pretty
trivial.
### How was this patch tested?
Manually tested. CI in this PR should test them out as well.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45298 from HyukjinKwon/SPARK-47199.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
dev/connect-check-protos.py | 2 +-
python/pyspark/broadcast.py | 6 +-
python/pyspark/context.py | 26 ++++----
python/pyspark/files.py | 2 +-
.../connect/test_legacy_mode_classification.py | 2 +-
.../tests/connect/test_legacy_mode_evaluation.py | 6 +-
.../ml/tests/connect/test_legacy_mode_feature.py | 6 +-
.../ml/tests/connect/test_legacy_mode_pipeline.py | 2 +-
.../ml/tests/connect/test_legacy_mode_tuning.py | 2 +-
python/pyspark/ml/tests/test_als.py | 2 +-
python/pyspark/rdd.py | 18 +++---
python/pyspark/sql/catalog.py | 8 +--
python/pyspark/sql/dataframe.py | 6 +-
python/pyspark/sql/protobuf/functions.py | 4 +-
python/pyspark/sql/readwriter.py | 73 +++++++++++-----------
python/pyspark/sql/session.py | 2 +-
python/pyspark/sql/streaming/readwriter.py | 50 ++++++++-------
.../sql/tests/connect/client/test_artifact.py | 16 ++---
.../sql/tests/connect/test_connect_basic.py | 20 +++---
.../pyspark/sql/tests/streaming/test_streaming.py | 2 +-
python/pyspark/sql/tests/test_catalog.py | 2 +-
python/pyspark/sql/tests/test_python_datasource.py | 6 +-
python/pyspark/sql/tests/test_udf_profiler.py | 4 +-
python/pyspark/sql/tests/test_udtf.py | 16 ++---
python/pyspark/tests/test_install_spark.py | 2 +-
python/pyspark/tests/test_memory_profiler.py | 4 +-
python/pyspark/tests/test_profiler.py | 2 +-
python/pyspark/tests/test_shuffle.py | 10 +--
python/pyspark/util.py | 2 +-
29 files changed, 154 insertions(+), 149 deletions(-)
diff --git a/dev/connect-check-protos.py b/dev/connect-check-protos.py
index 513938f8d4f8..ffc74d7b1608 100755
--- a/dev/connect-check-protos.py
+++ b/dev/connect-check-protos.py
@@ -45,7 +45,7 @@ def run_cmd(cmd):
def check_connect_protos():
print("Start checking the generated codes in pyspark-connect.")
- with tempfile.TemporaryDirectory() as tmp:
+ with tempfile.TemporaryDirectory(prefix="check_connect_protos") as tmp:
run_cmd(f"{SPARK_HOME}/dev/connect-gen-protos.sh {tmp}")
result = filecmp.dircmp(
f"{SPARK_HOME}/python/pyspark/sql/connect/proto/",
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 1f2b3263245a..a5a68d779781 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -174,7 +174,7 @@ class Broadcast(Generic[T]):
Write a pickled representation of `b` to the open temp file.
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="dump") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
@@ -215,7 +215,7 @@ class Broadcast(Generic[T]):
Read the pickled representation of value from temp file.
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="load_from_path") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
@@ -250,7 +250,7 @@ class Broadcast(Generic[T]):
Read the pickled representation of value from the open temp file.
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="load") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "wb") as f:
... b.dump(b.value, f)
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 19d3608c3825..bcc9fbf935ba 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -899,7 +899,7 @@ class SparkContext:
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="pickleFile") as d:
... # Write a temporary pickled file
... path1 = os.path.join(d, "pickled1")
... sc.parallelize(range(10)).saveAsPickleFile(path1, 3)
@@ -962,7 +962,7 @@ class SparkContext:
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="textFile") as d:
... path1 = os.path.join(d, "text1")
... path2 = os.path.join(d, "text2")
...
@@ -1052,7 +1052,7 @@ class SparkContext:
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="wholeTextFiles") as d:
... # Write a temporary text file
... with open(os.path.join(d, "1.txt"), "w") as f:
... _ = f.write("123")
@@ -1107,7 +1107,7 @@ class SparkContext:
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="binaryFiles") as d:
... # Write a temporary binary file
... with open(os.path.join(d, "1.bin"), "wb") as f1:
... _ = f1.write(b"binary data I")
@@ -1156,7 +1156,7 @@ class SparkContext:
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="binaryRecords") as d:
... # Write a temporary file
... with open(os.path.join(d, "1.bin"), "w") as f:
... for i in range(3):
@@ -1247,7 +1247,7 @@ class SparkContext:
>>> output_format_class =
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="sequenceFile") as d:
... path = os.path.join(d, "hadoop_file")
...
... # Write a temporary Hadoop file
@@ -1345,7 +1345,7 @@ class SparkContext:
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopFile") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Write a temporary Hadoop file
@@ -1437,7 +1437,7 @@ class SparkContext:
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="newAPIHadoopRDD") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Create the conf for writing
@@ -1544,7 +1544,7 @@ class SparkContext:
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="hadoopFile") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Write a temporary Hadoop file
@@ -1634,7 +1634,7 @@ class SparkContext:
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="hadoopRDD") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Create the conf for writing
@@ -1694,7 +1694,7 @@ class SparkContext:
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="union") as d:
... # generate a text RDD
... with open(os.path.join(d, "union-text.txt"), "w") as f:
... _ = f.write("Hello")
@@ -1860,7 +1860,7 @@ class SparkContext:
>>> import tempfile
>>> from pyspark import SparkFiles
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="addFile") as d:
... path1 = os.path.join(d, "test1.txt")
... with open(path1, "w") as f:
... _ = f.write("100")
@@ -1984,7 +1984,7 @@ class SparkContext:
>>> import zipfile
>>> from pyspark import SparkFiles
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="addArchive") as d:
... path = os.path.join(d, "test.txt")
... with open(path, "w") as f:
... _ = f.write("100")
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
index 8044cf48a3f5..92130389d975 100644
--- a/python/pyspark/files.py
+++ b/python/pyspark/files.py
@@ -73,7 +73,7 @@ class SparkFiles:
>>> import tempfile
>>> from pyspark import SparkFiles
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="get") as d:
... path1 = os.path.join(d, "test.txt")
... with open(path1, "w") as f:
... _ = f.write("100")
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
index 26eb4230df35..db9a29804808 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_classification.py
@@ -132,7 +132,7 @@ class ClassificationTestsMixin:
self._check_result(local_transform_result, expected_predictions,
expected_probabilities)
def test_save_load(self):
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_save_load") as tmp_dir:
estimator = LORV2(maxIter=2, numTrainWorkers=2, learningRate=0.001)
local_path = os.path.join(tmp_dir, "estimator")
estimator.saveToLocal(local_path)
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py
index 6a6d6b183d1b..ae01031ff462 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_evaluation.py
@@ -87,7 +87,7 @@ class EvaluationTestsMixin:
np.testing.assert_almost_equal(r2_local, expected_r2)
# Test save / load
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_regressor_evaluator") as
tmp_dir:
r2_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator =
RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "r2"
@@ -133,7 +133,7 @@ class EvaluationTestsMixin:
np.testing.assert_almost_equal(auprc_local, expected_auprc,
decimal=2)
# Test save / load
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with
tempfile.TemporaryDirectory(prefix="test_binary_classifier_evaluator") as
tmp_dir:
auprc_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator =
RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "areaUnderPR"
@@ -170,7 +170,7 @@ class EvaluationTestsMixin:
np.testing.assert_almost_equal(accuracy_local, expected_accuracy,
decimal=2)
# Test save / load
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with
tempfile.TemporaryDirectory(prefix="test_multiclass_classifier_evaluator") as
tmp_dir:
accuracy_evaluator.saveToLocal(f"{tmp_dir}/ev")
loaded_evaluator =
RegressionEvaluator.loadFromLocal(f"{tmp_dir}/ev")
assert loaded_evaluator.getMetricName() == "accuracy"
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
index 8e07879d8a26..9565b3a09a5b 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_feature.py
@@ -69,7 +69,7 @@ class FeatureTestsMixin:
np.testing.assert_allclose(list(local_transform_result.scaled_features),
expected_result)
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_max_abs_scaler") as
tmp_dir:
estimator_path = os.path.join(tmp_dir, "estimator")
scaler.saveToLocal(estimator_path)
loaded_scaler = MaxAbsScaler.loadFromLocal(estimator_path)
@@ -124,7 +124,7 @@ class FeatureTestsMixin:
np.testing.assert_allclose(list(local_transform_result.scaled_features),
expected_result)
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_standard_scaler") as
tmp_dir:
estimator_path = os.path.join(tmp_dir, "estimator")
scaler.saveToLocal(estimator_path)
loaded_scaler = StandardScaler.loadFromLocal(estimator_path)
@@ -176,7 +176,7 @@ class FeatureTestsMixin:
result2[1][1] = np.nan
np.testing.assert_allclose(result2, expected_result)
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_array_assembler") as
tmp_dir:
save_path = os.path.join(tmp_dir, "assembler")
assembler1.saveToLocal(save_path)
loaded_assembler = ArrayAssembler.loadFromLocal(save_path)
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
index 0fd0fd63ffce..104aff17e0b2 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_pipeline.py
@@ -91,7 +91,7 @@ class PipelineTestsMixin:
pd.testing.assert_frame_equal(local_eval_dataset,
local_eval_dataset_copy)
self._check_result(local_transform_result2, expected_predictions,
expected_probabilities)
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_pipeline") as tmp_dir:
pipeline_local_path = os.path.join(tmp_dir, "pipeline")
pipeline.saveToLocal(pipeline_local_path)
loaded_pipeline = Pipeline.loadFromLocal(pipeline_local_path)
diff --git a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
index 69e33a36d4f3..7f26788c137f 100644
--- a/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
+++ b/python/pyspark/ml/tests/connect/test_legacy_mode_tuning.py
@@ -229,7 +229,7 @@ class CrossValidatorTestsMixin:
assert instance.getEstimatorParamMaps() ==
loaded_instance.getEstimatorParamMaps()
# Test save / load
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with
tempfile.TemporaryDirectory(prefix="test_crossvalidator_on_pipeline") as
tmp_dir:
cv.saveToLocal(f"{tmp_dir}/cv")
loaded_cv = CrossValidator.loadFromLocal(f"{tmp_dir}/cv")
diff --git a/python/pyspark/ml/tests/test_als.py
b/python/pyspark/ml/tests/test_als.py
index 8eec0d937768..3027b3ab9fd6 100644
--- a/python/pyspark/ml/tests/test_als.py
+++ b/python/pyspark/ml/tests/test_als.py
@@ -39,7 +39,7 @@ class ALSTest(ReusedSQLTestCase):
seed=42,
).fit(data)
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_ambiguous_column") as d:
model.write().overwrite().save(d)
loaded_model = ALSModel().load(d)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index bbc3432980ec..cb5f1746b118 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2970,7 +2970,7 @@ class RDD(Generic[T_co]):
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with
tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopDataset") as d:
... path = os.path.join(d, "new_hadoop_file")
...
... # Create the conf for writing
@@ -3059,7 +3059,7 @@ class RDD(Generic[T_co]):
>>> output_format_class =
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsNewAPIHadoopFile")
as d:
... path = os.path.join(d, "hadoop_file")
...
... # Write a temporary Hadoop file
@@ -3129,7 +3129,7 @@ class RDD(Generic[T_co]):
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopDataset") as
d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Create the conf for writing
@@ -3224,7 +3224,7 @@ class RDD(Generic[T_co]):
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsHadoopFile") as d:
... path = os.path.join(d, "old_hadoop_file")
...
... # Write a temporary Hadoop file
@@ -3290,7 +3290,7 @@ class RDD(Generic[T_co]):
Set the related classes
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsSequenceFile") as d:
... path = os.path.join(d, "sequence_file")
...
... # Write a temporary sequence file
@@ -3332,7 +3332,7 @@ class RDD(Generic[T_co]):
--------
>>> import os
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsPickleFile") as d:
... path = os.path.join(d, "pickle_file")
...
... # Write a temporary pickled file
@@ -3374,7 +3374,7 @@ class RDD(Generic[T_co]):
>>> import tempfile
>>> from fileinput import input
>>> from glob import glob
- >>> with tempfile.TemporaryDirectory() as d1:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile1") as d1:
... path1 = os.path.join(d1, "text_file1")
...
... # Write a temporary text file
@@ -3386,7 +3386,7 @@ class RDD(Generic[T_co]):
Empty lines are tolerated when saving to text files.
- >>> with tempfile.TemporaryDirectory() as d2:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile2") as d2:
... path2 = os.path.join(d2, "text2_file2")
...
... # Write another temporary text file
@@ -3399,7 +3399,7 @@ class RDD(Generic[T_co]):
Using compressionCodecClass
>>> from fileinput import input, hook_compressed
- >>> with tempfile.TemporaryDirectory() as d3:
+ >>> with tempfile.TemporaryDirectory(prefix="saveAsTextFile3") as d3:
... path3 = os.path.join(d3, "text3")
... codec = "org.apache.hadoop.io.compress.GzipCodec"
...
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 6595659a4dae..d70bd89baeda 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -836,7 +836,7 @@ class Catalog:
Creating an external table
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="createTable") as d:
... _ = spark.catalog.createTable(
... "tbl2", schema=spark.range(1).schema, path=d,
source='parquet')
>>> _ = spark.sql("DROP TABLE tbl2")
@@ -1119,7 +1119,7 @@ class Catalog:
The example below caches a table, and then removes the data.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="refreshTable") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... _ = spark.sql(
... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION
'{}'".format(d))
@@ -1170,7 +1170,7 @@ class Catalog:
the partitioned table. After that, it recovers the partitions.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="recoverPartitions") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... spark.range(1).selectExpr(
... "id as key", "id as
value").write.partitionBy("key").mode("overwrite").save(d)
@@ -1209,7 +1209,7 @@ class Catalog:
The example below caches a table, and then removes the data.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="refreshByPath") as d:
... _ = spark.sql("DROP TABLE IF EXISTS tbl1")
... _ = spark.sql(
... "CREATE TABLE tbl1 (col STRING) USING TEXT LOCATION
'{}'".format(d))
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index dad031fc2625..5c5c263b8156 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -586,7 +586,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
>>> type(df.writeStream)
<class '...streaming.readwriter.DataStreamWriter'>
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="writeStream") as d:
... # Create a table with Rate source.
... df.writeStream.toTable(
... "my_table", checkpointLocation=d)
@@ -1139,7 +1139,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
>>> import tempfile
>>> df = spark.createDataFrame([
... (14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="checkpoint") as d:
... spark.sparkContext.setCheckpointDir("/tmp/bb")
... df.checkpoint(False)
DataFrame[age: bigint, name: string]
@@ -6766,7 +6766,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
Examples
--------
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="inputFiles") as d:
... # Write a single-row DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
diff --git a/python/pyspark/sql/protobuf/functions.py
b/python/pyspark/sql/protobuf/functions.py
index acb1a17efbd6..5a99aed55f74 100644
--- a/python/pyspark/sql/protobuf/functions.py
+++ b/python/pyspark/sql/protobuf/functions.py
@@ -94,7 +94,7 @@ def from_protobuf(
... '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
file
- >>> with tempfile.TemporaryDirectory() as tmp_dir:
+ >>> with tempfile.TemporaryDirectory(prefix="from_protobuf") as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
@@ -224,7 +224,7 @@ def to_protobuf(
... '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto
file
- >>> with tempfile.TemporaryDirectory() as tmp_dir:
+ >>> with tempfile.TemporaryDirectory(prefix="to_protobuf") as tmp_dir:
... desc_file_path = "%s/pyspark_test.desc" % tmp_dir
... with open(desc_file_path, "wb") as f:
... _ = f.write(bytearray.fromhex(desc_hex))
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index db9220fc48bb..9eb5d99dfa4c 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -96,7 +96,7 @@ class DataFrameReader(OptionUtils):
Write a DataFrame into a JSON file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="format") as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -139,7 +139,7 @@ class DataFrameReader(OptionUtils):
Specify the schema with reading a CSV file.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="schema") as d:
... spark.read.schema("col0 INT, col1
DOUBLE").format("csv").load(d).printSchema()
root
|-- col0: integer (nullable = true)
@@ -187,7 +187,7 @@ class DataFrameReader(OptionUtils):
Specify the option 'nullValue' with reading a CSV file.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="option") as d:
... # Write a DataFrame into a CSV file
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}])
... df.write.mode("overwrite").format("csv").save(d)
@@ -231,7 +231,7 @@ class DataFrameReader(OptionUtils):
Specify the option 'nullValue' and 'header' with reading a CSV file.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="options") as d:
... # Write a DataFrame into a CSV file with a header.
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}])
... df.write.option("header",
True).mode("overwrite").format("csv").save(d)
@@ -283,7 +283,7 @@ class DataFrameReader(OptionUtils):
Load a CSV file with format, schema and options specified.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="load") as d:
... # Write a DataFrame into a CSV file with a header
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}])
... df.write.option("header",
True).mode("overwrite").format("csv").save(d)
@@ -383,7 +383,7 @@ class DataFrameReader(OptionUtils):
Example 1: Write a DataFrame into a JSON file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="json1") as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin"}]
@@ -399,8 +399,8 @@ class DataFrameReader(OptionUtils):
Example 2: Read JSON from multiple files in a directory
- >>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d1,
tempfile.TemporaryDirectory() as d2:
+ >>> from tempfile import TemporaryDirectory
+ >>> with TemporaryDirectory(prefix="json2") as d1,
TemporaryDirectory(prefix="json3") as d2:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 30, "name": "Bob"}]
@@ -421,7 +421,7 @@ class DataFrameReader(OptionUtils):
Example 3: Read JSON with a custom schema
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="json4") as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 30, "name": "Bob"}]
@@ -564,7 +564,7 @@ class DataFrameReader(OptionUtils):
Write a DataFrame into a Parquet file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="parquet1") as d:
... # Write a DataFrame into a Parquet file.
... df.write.mode("overwrite").format("parquet").save(d)
...
@@ -580,7 +580,7 @@ class DataFrameReader(OptionUtils):
Read a Parquet file with a specific column.
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="parquet2") as d:
... df.write.mode("overwrite").format("parquet").save(d)
...
... # Read the Parquet file with only the 'name' column.
@@ -595,15 +595,16 @@ class DataFrameReader(OptionUtils):
Read multiple Parquet files and merge schema.
- >>> with tempfile.TemporaryDirectory() as d1,
tempfile.TemporaryDirectory() as d2:
- ... df.write.mode("overwrite").format("parquet").save(d1)
- ... df2.write.mode("overwrite").format("parquet").save(d2)
+ >>> with tempfile.TemporaryDirectory(prefix="parquet3") as d1:
+ ... with tempfile.TemporaryDirectory(prefix="parquet4") as d2:
+ ... df.write.mode("overwrite").format("parquet").save(d1)
+ ... df2.write.mode("overwrite").format("parquet").save(d2)
...
- ... spark.read.option(
- ... "mergeSchema", "true"
- ... ).parquet(d1, d2).select(
- ... "name", "age", "height"
- ... ).orderBy("name", "age").show()
+ ... spark.read.option(
+ ... "mergeSchema", "true"
+ ... ).parquet(d1, d2).select(
+ ... "name", "age", "height"
+ ... ).orderBy("name", "age").show()
+-----+----+------+
| name| age|height|
+-----+----+------+
@@ -675,7 +676,7 @@ class DataFrameReader(OptionUtils):
Write a DataFrame into a text file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="text") as d:
... # Write a DataFrame into a text file
... df = spark.createDataFrame([("a",), ("b",), ("c",)],
schema=["alphabets"])
... df.write.mode("overwrite").format("text").save(d)
@@ -775,7 +776,7 @@ class DataFrameReader(OptionUtils):
Write a DataFrame into a CSV file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="csv") as d:
... # Write a DataFrame into a CSV file
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}])
... df.write.mode("overwrite").format("csv").save(d)
@@ -911,7 +912,7 @@ class DataFrameReader(OptionUtils):
Write a DataFrame into a XML file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="xml") as d:
... # Write a DataFrame into a XML file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1015,7 +1016,7 @@ class DataFrameReader(OptionUtils):
Write a DataFrame into a ORC file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="orc") as d:
... # Write a DataFrame into a ORC file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1201,7 +1202,7 @@ class DataFrameWriter(OptionUtils):
Raise an error when writing to an existing path.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="mode1") as d:
... spark.createDataFrame(
... [{"age": 80, "name": "Xinrong Meng"}]
... ).write.mode("error").format("parquet").save(d) # doctest:
+SKIP
@@ -1211,7 +1212,7 @@ class DataFrameWriter(OptionUtils):
Write a Parquet file back with various options, and read it back.
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="mode2") as d:
... # Overwrite the path with a new Parquet file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1263,7 +1264,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a Parquet file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="format") as d:
... # Write a DataFrame into a Parquet file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1304,7 +1305,7 @@ class DataFrameWriter(OptionUtils):
Specify the option 'nullValue' with writing a CSV file.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="option") as d:
... # Write a DataFrame into a CSV file with 'nullValue' option
set to 'Hyukjin Kwon'.
... df = spark.createDataFrame([(100, None)], "age INT, name
STRING")
... df.write.option("nullValue", "Hyukjin
Kwon").mode("overwrite").format("csv").save(d)
@@ -1353,7 +1354,7 @@ class DataFrameWriter(OptionUtils):
... StructField("name",StringType(),True),
... ])
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="options") as d:
... # Write a DataFrame into a CSV file with 'nullValue' option
set to 'Hyukjin Kwon',
... # and 'header' option set to `True`.
... df = spark.createDataFrame([(100, None)], schema=schema)
@@ -1402,7 +1403,7 @@ class DataFrameWriter(OptionUtils):
>>> import tempfile
>>> import os
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="partitionBy") as d:
... # Write a DataFrame into a Parquet file in a partitioned
manner.
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}, {"age": 120,
"name": "Ruifeng Zheng"}]
@@ -1656,7 +1657,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a JSON file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="save") as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1850,7 +1851,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a JSON file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="json") as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1918,7 +1919,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a Parquet file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="parquet") as d:
... # Write a DataFrame into a Parquet file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -1973,7 +1974,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a text file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="text") as d:
... # Write a DataFrame into a text file
... df = spark.createDataFrame([("a",), ("b",), ("c",)],
schema=["alphabets"])
... df.write.mode("overwrite").text(d)
@@ -2046,7 +2047,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a CSV file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="csv") as d:
... # Write a DataFrame into a CSV file
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}])
... df.write.csv(d, mode="overwrite")
@@ -2129,7 +2130,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a XML file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="xml") as d:
... # Write a DataFrame into a XML file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -2203,7 +2204,7 @@ class DataFrameWriter(OptionUtils):
Write a DataFrame into a ORC file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="orc") as d:
... # Write a DataFrame into a ORC file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 0a19921f7286..c70a28f58eca 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -1748,7 +1748,7 @@ class SparkSession(SparkConversionMixin):
Write a DataFrame into a JSON file and read it back.
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="read") as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
diff --git a/python/pyspark/sql/streaming/readwriter.py
b/python/pyspark/sql/streaming/readwriter.py
index 01441ee77ac1..41a83355ab6c 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -109,7 +109,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="format") as d:
... # Write a temporary text file to read it.
... spark.createDataFrame(
... [("hello",),
("this",)]).write.mode("overwrite").format("text").save(d)
@@ -156,7 +156,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="schema") as d:
... # Start a streaming query to read the CSV file.
... spark.readStream.schema("col0 INT, col1
STRING").format("csv").load(d).printSchema()
root
@@ -280,7 +280,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="load") as d:
... # Write a temporary JSON file to read it.
... spark.createDataFrame(
... [(100, "Hyukjin Kwon"),], ["age", "name"]
@@ -375,7 +375,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="json") as d:
... # Write a temporary JSON file to read it.
... spark.createDataFrame(
... [(100, "Hyukjin Kwon"),], ["age", "name"]
@@ -448,7 +448,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="orc") as d:
... # Write a temporary ORC file to read it.
... spark.range(10).write.mode("overwrite").format("orc").save(d)
...
@@ -507,7 +507,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="parquet") as d:
... # Write a temporary Parquet file to read it.
...
spark.range(10).write.mode("overwrite").format("parquet").save(d)
...
@@ -577,7 +577,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="text") as d:
... # Write a temporary text file to read it.
... spark.createDataFrame(
... [("hello",),
("this",)]).write.mode("overwrite").format("text").save(d)
@@ -673,7 +673,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="csv") as d:
... # Write a temporary text file to read it.
... spark.createDataFrame([(1,
"2"),]).write.mode("overwrite").format("csv").save(d)
...
@@ -781,7 +781,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="xml") as d:
... # Write a DataFrame into a XML file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
@@ -852,7 +852,7 @@ class DataStreamReader(OptionUtils):
>>> import tempfile
>>> import time
>>> _ = spark.sql("DROP TABLE IF EXISTS my_table")
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="table") as d:
... # Create a table with Rate source.
... q1 =
spark.readStream.format("rate").load().writeStream.toTable(
... "my_table", checkpointLocation=d)
@@ -984,12 +984,13 @@ class DataStreamWriter:
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d,
tempfile.TemporaryDirectory() as cp:
- ... df = spark.readStream.format("rate").load()
- ... q = df.writeStream.format("csv").option("checkpointLocation",
cp).start(d)
- ... time.sleep(5)
- ... q.stop()
- ... spark.read.schema("timestamp TIMESTAMP, value
STRING").csv(d).show()
+ >>> with tempfile.TemporaryDirectory(prefix="format1") as d:
+ ... with tempfile.TemporaryDirectory(prefix="format2") as cp:
+ ... df = spark.readStream.format("rate").load()
+ ... q =
df.writeStream.format("csv").option("checkpointLocation", cp).start(d)
+ ... time.sleep(5)
+ ... q.stop()
+ ... spark.read.schema("timestamp TIMESTAMP, value
STRING").csv(d).show()
+...---------+-----+
|...timestamp|value|
+...---------+-----+
@@ -1104,13 +1105,14 @@ class DataStreamWriter:
>>> import tempfile
>>> import time
- >>> with tempfile.TemporaryDirectory() as d,
tempfile.TemporaryDirectory() as cp:
- ... df = spark.readStream.format("rate").option("rowsPerSecond",
10).load()
- ... q = df.writeStream.partitionBy(
- ...
"timestamp").format("parquet").option("checkpointLocation", cp).start(d)
- ... time.sleep(5)
- ... q.stop()
- ... spark.read.schema(df.schema).parquet(d).show()
+ >>> with tempfile.TemporaryDirectory(prefix="partitionBy1") as d:
+ ... with tempfile.TemporaryDirectory(prefix="partitionBy2") as cp:
+ ... df =
spark.readStream.format("rate").option("rowsPerSecond", 10).load()
+ ... q = df.writeStream.partitionBy(
+ ...
"timestamp").format("parquet").option("checkpointLocation", cp).start(d)
+ ... time.sleep(5)
+ ... q.stop()
+ ... spark.read.schema(df.schema).parquet(d).show()
+...---------+-----+
|...timestamp|value|
+...---------+-----+
@@ -1703,7 +1705,7 @@ class DataStreamWriter:
>>> import tempfile
>>> import time
>>> _ = spark.sql("DROP TABLE IF EXISTS my_table2")
- >>> with tempfile.TemporaryDirectory() as d:
+ >>> with tempfile.TemporaryDirectory(prefix="toTable") as d:
... # Create a table with Rate source.
... q = spark.readStream.format("rate").option(
... "rowsPerSecond", 10).load().writeStream.toTable(
diff --git a/python/pyspark/sql/tests/connect/client/test_artifact.py
b/python/pyspark/sql/tests/connect/client/test_artifact.py
index eca6e7de7634..f1cbf637b92a 100644
--- a/python/pyspark/sql/tests/connect/client/test_artifact.py
+++ b/python/pyspark/sql/tests/connect/client/test_artifact.py
@@ -34,7 +34,7 @@ if should_test_connect:
class ArtifactTestsMixin:
def check_add_pyfile(self, spark_session):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="check_add_pyfile") as d:
pyfile_path = os.path.join(d, "my_pyfile.py")
with open(pyfile_path, "w") as f:
f.write("my_func = lambda: 10")
@@ -60,7 +60,7 @@ class ArtifactTestsMixin:
)
def test_artifacts_cannot_be_overwritten(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_artifacts_cannot_be_overwritten") as d:
pyfile_path = os.path.join(d, "my_pyfile.py")
with open(pyfile_path, "w+") as f:
f.write("my_func = lambda: 10")
@@ -79,7 +79,7 @@ class ArtifactTestsMixin:
self.spark.addArtifacts(pyfile_path, pyfile=True)
def check_add_zipped_package(self, spark_session):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="check_add_zipped_package") as
d:
package_path = os.path.join(d, "my_zipfile")
os.mkdir(package_path)
pyfile_path = os.path.join(package_path, "__init__.py")
@@ -108,7 +108,7 @@ class ArtifactTestsMixin:
)
def check_add_archive(self, spark_session):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="check_add_archive") as d:
archive_path = os.path.join(d, "my_archive")
os.mkdir(archive_path)
pyfile_path = os.path.join(archive_path, "my_file.txt")
@@ -144,7 +144,7 @@ class ArtifactTestsMixin:
)
def check_add_file(self, spark_session):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="check_add_file") as d:
file_path = os.path.join(d, "my_file.txt")
with open(file_path, "w") as f:
f.write("Hello world!!")
@@ -393,8 +393,8 @@ class ArtifactTests(ReusedConnectTestCase,
ArtifactTestsMixin):
self.assertEqual(artifact2.data.data, data)
def test_copy_from_local_to_fs(self):
- with tempfile.TemporaryDirectory() as d:
- with tempfile.TemporaryDirectory() as d2:
+ with tempfile.TemporaryDirectory(prefix="test_copy_from_local_to_fs1")
as d:
+ with
tempfile.TemporaryDirectory(prefix="test_copy_from_local_to_fs2") as d2:
file_path = os.path.join(d, "file1")
dest_path = os.path.join(d2, "file1_dest")
file_content = "test_copy_from_local_to_FS"
@@ -417,7 +417,7 @@ class ArtifactTests(ReusedConnectTestCase,
ArtifactTestsMixin):
self.assertEqual(self.artifact_manager.is_cached_artifact(expected_hash), True)
def test_add_not_existing_artifact(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_add_not_existing_artifact") as d:
with self.assertRaises(FileNotFoundError):
self.artifact_manager.add_artifacts(
os.path.join(d, "not_existing"), file=True, pyfile=False,
archive=False
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 3f5484f48764..88c4754029b6 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -291,7 +291,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.assertEqual(len(data.index), 10)
def test_json(self):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_json") as d:
# Write a DataFrame into a JSON file
self.spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}]).write.mode(
"overwrite"
@@ -376,7 +376,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_parquet(self):
# SPARK-41445: Implement DataFrameReader.parquet
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_parquet") as d:
# Write a DataFrame into a JSON file
self.spark.createDataFrame([{"age": 100, "name": "Hyukjin
Kwon"}]).write.mode(
"overwrite"
@@ -388,7 +388,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_text(self):
# SPARK-41849: Implement DataFrameReader.text
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_text") as d:
# Write a DataFrame into a text file
self.spark.createDataFrame(
[{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}]
@@ -398,7 +398,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_csv(self):
# SPARK-42011: Implement DataFrameReader.csv
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_csv") as d:
# Write a DataFrame into a text file
self.spark.createDataFrame(
[{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}]
@@ -409,7 +409,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_multi_paths(self):
# SPARK-42041: DataFrameReader should support list of paths
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_multi_paths1") as d:
text_files = []
for i in range(0, 3):
text_file = f"{d}/text-{i}.text"
@@ -421,7 +421,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.spark.read.text(text_files).collect(),
)
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_multi_paths2") as d:
json_files = []
for i in range(0, 5):
json_file = f"{d}/json-{i}.json"
@@ -435,7 +435,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_orc(self):
# SPARK-42012: Implement DataFrameReader.orc
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_orc") as d:
# Write a DataFrame into a text file
self.spark.createDataFrame(
[{"name": "Sandeep Singh"}, {"name": "Hyukjin Kwon"}]
@@ -2474,7 +2474,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
)
def test_write_operations(self):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_write_operations") as d:
df = self.connect.range(50)
df.write.mode("overwrite").format("csv").save(d)
@@ -2483,7 +2483,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
cd = ndf.collect()
self.assertEqual(set(df.collect()), set(cd))
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_write_operations") as d:
df = self.connect.range(50)
df.write.mode("overwrite").csv(d, lineSep="|")
@@ -3120,7 +3120,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
def test_simple_udt_from_read(self):
from pyspark.ml.linalg import Matrices, Vectors
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_simple_udt_from_read")
as d:
path1 = f"{d}/df1.parquet"
self.spark.createDataFrame(
[(i % 3, PythonOnlyPoint(float(i), float(i))) for i in
range(10)],
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 31486feae156..abfacdbbf059 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -358,7 +358,7 @@ class StreamingTestsMixin:
)
def test_streaming_write_to_table(self):
- with self.table("output_table"), tempfile.TemporaryDirectory() as
tmpdir:
+ with self.table("output_table"),
tempfile.TemporaryDirectory(prefix="to_table") as tmpdir:
df = self.spark.readStream.format("rate").option("rowsPerSecond",
10).load()
q = df.writeStream.toTable("output_table", format="parquet",
checkpointLocation=tmpdir)
self.assertTrue(q.isActive)
diff --git a/python/pyspark/sql/tests/test_catalog.py
b/python/pyspark/sql/tests/test_catalog.py
index 278fbbb2ba51..bc6bfdd2759f 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -477,7 +477,7 @@ class CatalogTestsMixin:
import tempfile
spark = self.spark
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_refresh_table") as
tmp_dir:
with self.table("my_tab"):
spark.sql(
"CREATE TABLE my_tab (col STRING) USING TEXT LOCATION
'{}'".format(tmp_dir)
diff --git a/python/pyspark/sql/tests/test_python_datasource.py
b/python/pyspark/sql/tests/test_python_datasource.py
index 6ba4b68b02ba..343f8f248327 100644
--- a/python/pyspark/sql/tests/test_python_datasource.py
+++ b/python/pyspark/sql/tests/test_python_datasource.py
@@ -329,14 +329,14 @@ class BasePythonDataSourceTestsMixin:
self.spark.dataSource.register(data_source)
input_path = os.path.join(SPARK_HOME,
"python/test_support/sql/people.json")
df = self.spark.read.json(input_path)
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_custom_json_data_source_write") as d:
df.write.format("my-json").mode("append").save(d)
assertDataFrameEqual(self.spark.read.json(d),
self.spark.read.json(input_path))
def test_custom_json_data_source_commit(self):
data_source = self._get_test_json_data_source()
self.spark.dataSource.register(data_source)
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_custom_json_data_source_commit") as d:
self.spark.range(0, 5, 1,
3).write.format("my-json").mode("append").save(d)
with open(os.path.join(d, "_success.txt"), "r") as file:
text = file.read()
@@ -345,7 +345,7 @@ class BasePythonDataSourceTestsMixin:
def test_custom_json_data_source_abort(self):
data_source = self._get_test_json_data_source()
self.spark.dataSource.register(data_source)
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_custom_json_data_source_abort") as d:
with self.assertRaises(PythonException):
self.spark.range(0, 8, 1,
3).write.format("my-json").mode("append").save(d)
with open(os.path.join(d, "_failed.txt"), "r") as file:
diff --git a/python/pyspark/sql/tests/test_udf_profiler.py
b/python/pyspark/sql/tests/test_udf_profiler.py
index 99684a20af41..557b4daa8550 100644
--- a/python/pyspark/sql/tests/test_udf_profiler.py
+++ b/python/pyspark/sql/tests/test_udf_profiler.py
@@ -82,7 +82,7 @@ class UDFProfilerTests(unittest.TestCase):
finally:
sys.stdout = old_stdout
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_udf_profiler") as d:
self.sc.dump_profiles(d)
for i, udf_name in enumerate(["add1", "add2", "add1", "add2"]):
@@ -185,7 +185,7 @@ class UDFProfiler2TestsMixin:
with self.trap_stdout() as io_all:
self.spark.profile.show(type="perf")
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_perf_profiler_udf") as d:
self.spark.profile.dump(d, type="perf")
for id in self.profile_results:
diff --git a/python/pyspark/sql/tests/test_udtf.py
b/python/pyspark/sql/tests/test_udtf.py
index 41321f556ac6..3a17f7013bd6 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -408,7 +408,7 @@ class BaseUDTFTestsMixin:
)
def test_udtf_cleanup_with_exception_in_eval(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_udtf_cleanup_with_exception_in_eval")
as d:
path = os.path.join(d, "file.txt")
@udtf(returnType="x: int")
@@ -437,7 +437,9 @@ class BaseUDTFTestsMixin:
self.assertEqual(data, "cleanup")
def test_udtf_cleanup_with_exception_in_terminate(self):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(
+ prefix="test_udtf_cleanup_with_exception_in_terminate"
+ ) as d:
path = os.path.join(d, "file.txt")
@udtf(returnType="x: int")
@@ -942,7 +944,7 @@ class BaseUDTFTestsMixin:
)
def test_udtf_pickle_error(self):
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_udtf_pickle_error") as d:
file = os.path.join(d, "file.txt")
file_obj = open(file, "w")
@@ -1715,7 +1717,7 @@ class BaseUDTFTestsMixin:
self.sc.addPyFile(path)
def test_udtf_with_analyze_using_pyfile(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_pyfile") as d:
pyfile_path = os.path.join(d, "my_pyfile.py")
with open(pyfile_path, "w") as f:
f.write("my_func = lambda: 'col1'")
@@ -1752,7 +1754,7 @@ class BaseUDTFTestsMixin:
assertDataFrameEqual(df, [Row(col1=10), Row(col1=100)])
def test_udtf_with_analyze_using_zipped_package(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_zipped_package")
as d:
package_path = os.path.join(d, "my_zipfile")
os.mkdir(package_path)
pyfile_path = os.path.join(package_path, "__init__.py")
@@ -1795,7 +1797,7 @@ class BaseUDTFTestsMixin:
self.sc.addArchive(path)
def test_udtf_with_analyze_using_archive(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_archive") as d:
archive_path = os.path.join(d, "my_archive")
os.mkdir(archive_path)
pyfile_path = os.path.join(archive_path, "my_file.txt")
@@ -1842,7 +1844,7 @@ class BaseUDTFTestsMixin:
self.sc.addFile(path)
def test_udtf_with_analyze_using_file(self):
- with tempfile.TemporaryDirectory() as d:
+ with
tempfile.TemporaryDirectory(prefix="test_udtf_with_analyze_using_file") as d:
file_path = os.path.join(d, "my_file.txt")
with open(file_path, "w") as f:
f.write("col1")
diff --git a/python/pyspark/tests/test_install_spark.py
b/python/pyspark/tests/test_install_spark.py
index c4188034a105..dee28af9a407 100644
--- a/python/pyspark/tests/test_install_spark.py
+++ b/python/pyspark/tests/test_install_spark.py
@@ -34,7 +34,7 @@ class SparkInstallationTestCase(unittest.TestCase):
# the Spark distribution.
spark_version, hadoop_version, hive_version =
checked_versions("3.0.1", "3", "2.3")
- with tempfile.TemporaryDirectory() as tmp_dir:
+ with tempfile.TemporaryDirectory(prefix="test_install_spark") as
tmp_dir:
install_spark(
dest=tmp_dir,
spark_version=spark_version,
diff --git a/python/pyspark/tests/test_memory_profiler.py
b/python/pyspark/tests/test_memory_profiler.py
index a7d3fff7887c..f0abdd03e243 100644
--- a/python/pyspark/tests/test_memory_profiler.py
+++ b/python/pyspark/tests/test_memory_profiler.py
@@ -106,7 +106,7 @@ class MemoryProfilerTests(PySparkTestCase):
self.sc.show_profiles()
self.assertTrue("plus_one" in fake_out.getvalue())
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_memory_profiler") as d:
self.sc.dump_profiles(d)
self.assertTrue(f"udf_{id}_memory.txt" in os.listdir(d))
@@ -235,7 +235,7 @@ class MemoryProfiler2TestsMixin:
with self.trap_stdout() as io_all:
self.spark.profile.show(type="memory")
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_memory_profiler_udf") as
d:
self.spark.profile.dump(d, type="memory")
for id in self.profile_results:
diff --git a/python/pyspark/tests/test_profiler.py
b/python/pyspark/tests/test_profiler.py
index a12bc99c54ae..09470bd47f7e 100644
--- a/python/pyspark/tests/test_profiler.py
+++ b/python/pyspark/tests/test_profiler.py
@@ -54,7 +54,7 @@ class ProfilerTests(PySparkTestCase):
self.assertTrue("heavy_foo" in io.getvalue())
sys.stdout = old_stdout
- with tempfile.TemporaryDirectory() as d:
+ with tempfile.TemporaryDirectory(prefix="test_profiler") as d:
self.sc.dump_profiles(d)
self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
diff --git a/python/pyspark/tests/test_shuffle.py
b/python/pyspark/tests/test_shuffle.py
index 4fb73607a2e0..f40bf5ea9d66 100644
--- a/python/pyspark/tests/test_shuffle.py
+++ b/python/pyspark/tests/test_shuffle.py
@@ -16,7 +16,7 @@
#
import random
import unittest
-import tempfile
+from tempfile import TemporaryDirectory
import os
from py4j.protocol import Py4JJavaError
@@ -67,16 +67,16 @@ class MergerTests(unittest.TestCase):
# SPARK-39179: Test shuffle of data with multiple location also check
# shuffle locations get randomized
- with tempfile.TemporaryDirectory() as tempdir1,
tempfile.TemporaryDirectory() as tempdir2:
+ with TemporaryDirectory(prefix="shf1") as d1,
TemporaryDirectory(prefix="shf2") as d2:
original = os.environ.get("SPARK_LOCAL_DIRS", None)
- os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2
+ os.environ["SPARK_LOCAL_DIRS"] = d1 + "," + d2
try:
index_of_tempdir1 = [False, False]
for idx in range(10):
m = ExternalMerger(self.agg, 20)
- if m.localdirs[0].startswith(tempdir1):
+ if m.localdirs[0].startswith(d1):
index_of_tempdir1[0] = True
- elif m.localdirs[1].startswith(tempdir1):
+ elif m.localdirs[1].startswith(d1):
index_of_tempdir1[1] = True
m.mergeValues(self.data)
self.assertTrue(m.spills >= 1)
diff --git a/python/pyspark/util.py b/python/pyspark/util.py
index 4a828d6bfc94..ec9c2489b41e 100644
--- a/python/pyspark/util.py
+++ b/python/pyspark/util.py
@@ -124,7 +124,7 @@ def try_simplify_traceback(tb: TracebackType) ->
Optional[TracebackType]:
>>> import sys
>>> import traceback
>>> import tempfile
- >>> with tempfile.TemporaryDirectory() as tmp_dir:
+ >>> with tempfile.TemporaryDirectory(prefix="try_simplify_traceback") as
tmp_dir:
... with open("%s/dummy_module.py" % tmp_dir, "w") as f:
... _ = f.write(
... 'def raise_stop_iteration():\\n'
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]