This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c0a1ea2a4c42
[SPARK-49795][CORE][SQL][SS][DSTREAM][ML][MLLIB][K8S][YARN][EXAMPLES] Clean up
deprecated Guava API usage
c0a1ea2a4c42 is described below
commit c0a1ea2a4c4218fc15b8f990ed2f5ea99755d322
Author: yangjie01 <[email protected]>
AuthorDate: Mon Sep 30 23:45:21 2024 -0700
[SPARK-49795][CORE][SQL][SS][DSTREAM][ML][MLLIB][K8S][YARN][EXAMPLES] Clean
up deprecated Guava API usage
### What changes were proposed in this pull request?
In order to clean up the usage of deprecated Guava API, the following
changes were made in this pr:
1. Replaced `Files.write(from, to, charset)` with `Files.asCharSink(to,
charset).write(from)`. This change was made with reference to:
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/io/Files.java#L275-L291
```java
/**
* Writes a character sequence (such as a string) to a file using the
given character set.
*
* param from the character sequence to write
* param to the destination file
* param charset the charset used to encode the output stream; see {link
StandardCharsets} for
* helpful predefined constants
* throws IOException if an I/O error occurs
* deprecated Prefer {code asCharSink(to, charset).write(from)}.
*/
Deprecated
InlineMe(
replacement = "Files.asCharSink(to, charset).write(from)",
imports = "com.google.common.io.Files")
public static void write(CharSequence from, File to, Charset charset)
throws IOException {
asCharSink(to, charset).write(from);
}
```
2. Replaced `Files.append(from, to, charset)` with `Files.asCharSink(to,
charset, FileWriteMode.APPEND).write(from)`. This change was made with
reference to:
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/io/Files.java#L350-L368
```java
/**
* Appends a character sequence (such as a string) to a file using the
given character set.
*
* param from the character sequence to append
* param to the destination file
* param charset the charset used to encode the output stream; see {link
StandardCharsets} for
* helpful predefined constants
* throws IOException if an I/O error occurs
* deprecated Prefer {code asCharSink(to, charset,
FileWriteMode.APPEND).write(from)}. This
* method is scheduled to be removed in October 2019.
*/
Deprecated
InlineMe(
replacement = "Files.asCharSink(to, charset,
FileWriteMode.APPEND).write(from)",
imports = {"com.google.common.io.FileWriteMode",
"com.google.common.io.Files"})
public
static void append(CharSequence from, File to, Charset charset) throws
IOException {
asCharSink(to, charset, FileWriteMode.APPEND).write(from);
}
```
3. Replaced `Files.toString(file, charset)` with `Files.asCharSource(file,
charset).read()`. This change was made with reference to:
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/io/Files.java#L243-L259
```java
/**
* Reads all characters from a file into a {link String}, using the given
character set.
*
* param file the file to read from
* param charset the charset used to decode the input stream; see {link
StandardCharsets} for
* helpful predefined constants
* return a string containing all the characters from the file
* throws IOException if an I/O error occurs
* deprecated Prefer {code asCharSource(file, charset).read()}.
*/
Deprecated
InlineMe(
replacement = "Files.asCharSource(file, charset).read()",
imports = "com.google.common.io.Files")
public static String toString(File file, Charset charset) throws
IOException {
return asCharSource(file, charset).read();
}
```
4. Replaced `HashFunction.murmur3_32()` with
`HashFunction.murmur3_32_fixed()`. This change was made with reference to:
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Hashing.java#L99-L115
```java
/**
* Returns a hash function implementing the <a
*
href="https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp">32-bit
murmur3
* algorithm, x86 variant</a> (little-endian variant), using the given
seed value, <b>with a known
* bug</b> as described in the deprecation text.
*
* <p>The C++ equivalent is the MurmurHash3_x86_32 function (Murmur3A),
which however does not
* have the bug.
*
* deprecated This implementation produces incorrect hash values from the
{link
* HashFunction#hashString} method if the string contains non-BMP
characters. Use {link
* #murmur3_32_fixed()} instead.
*/
Deprecated
public static HashFunction murmur3_32() {
return Murmur3_32HashFunction.MURMUR3_32;
}
```
This change is safe for Spark. The difference between `MURMUR3_32` and
`MURMUR3_32_FIXED` lies in the different `supplementaryPlaneFix` parameters
passed when constructing the `Murmur3_32HashFunction`:
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#L56-L59
```java
static final HashFunction MURMUR3_32 =
new Murmur3_32HashFunction(0, /* supplementaryPlaneFix= */ false);
static final HashFunction MURMUR3_32_FIXED =
new Murmur3_32HashFunction(0, /* supplementaryPlaneFix= */ true);
```
However, the `supplementaryPlaneFix` parameter is only used in
`Murmur3_32HashFunction#hashString`, and Spark only utilizes
`Murmur3_32HashFunction#hashInt`. Therefore, there will be no logical changes
to this method after this change.
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/hash/Murmur3_32HashFunction.java#L108-L114
```java
Override
public HashCode hashInt(int input) {
int k1 = mixK1(input);
int h1 = mixH1(seed, k1);
return fmix(h1, Ints.BYTES);
}
```
5. Replaced `Throwables.propagateIfPossible(throwable, declaredType)` with
`Throwables.throwIfInstanceOf(throwable, declaredType)` +
`Throwables.throwIfUnchecked(throwable)`. This change was made with reference
to:
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/base/Throwables.java#L156-L175
```
/**
* Propagates {code throwable} exactly as-is, if and only if it is an
instance of {link
* RuntimeException}, {link Error}, or {code declaredType}.
*
* <p><b>Discouraged</b> in favor of calling {link #throwIfInstanceOf}
and {link
* #throwIfUnchecked}.
*
* param throwable the Throwable to possibly propagate
* param declaredType the single checked exception type declared by the
calling method
* deprecated Use a combination of {link #throwIfInstanceOf} and {link
#throwIfUnchecked},
* which togther provide the same behavior except that they reject
{code null}.
*/
Deprecated
J2ktIncompatible
GwtIncompatible // propagateIfInstanceOf
public static <X extends Throwable> void propagateIfPossible(
CheckForNull Throwable throwable, Class<X> declaredType) throws X {
propagateIfInstanceOf(throwable, declaredType);
propagateIfPossible(throwable);
}
```
6. Made modifications to `Throwables.propagate` with reference to
https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate
- For cases where it is known to be a checked exception, including
`IOException`, `GeneralSecurityException`, `SaslException`, and
`RocksDBException`, none of which are subclasses of `RuntimeException` or
`Error`, directly replaced `Throwables.propagate(e)` with `throw new
RuntimeException(e);`.
- For cases where it cannot be determined whether it is a checked exception
or an unchecked exception or Error, use
```java
throwIfUnchecked(e);
throw new RuntimeException(e);
```
to replace `Throwables.propagate(e)`。
https://github.com/google/guava/blob/0c33dd12b193402cdf6962d43d69743521aa2f76/guava/src/com/google/common/base/Throwables.java#L199-L235
```java
/**
* ...
* deprecated To preserve behavior, use {code throw e} or {code throw new
RuntimeException(e)}
* directly, or use a combination of {link #throwIfUnchecked} and
{code throw new
* RuntimeException(e)}. But consider whether users would be better
off if your API threw a
* different type of exception. For background on the deprecation,
read <a
* href="https://goo.gl/Ivn2kc">Why we deprecated {code
Throwables.propagate}</a>.
*/
CanIgnoreReturnValue
J2ktIncompatible
GwtIncompatible
Deprecated
public static RuntimeException propagate(Throwable throwable) {
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
```
### Why are the changes needed?
Clean up deprecated Guava API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48248 from LuciferYang/guava-deprecation.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/util/kvstore/LevelDB.java | 3 ++-
.../apache/spark/util/kvstore/LevelDBIterator.java | 5 ++--
.../org/apache/spark/util/kvstore/RocksDB.java | 3 ++-
.../apache/spark/util/kvstore/RocksDBIterator.java | 5 ++--
.../spark/network/client/TransportClient.java | 6 +++--
.../network/client/TransportClientFactory.java | 3 ++-
.../spark/network/crypto/AuthClientBootstrap.java | 3 +--
.../spark/network/crypto/AuthRpcHandler.java | 3 ++-
.../apache/spark/network/sasl/SparkSaslClient.java | 7 +++--
.../apache/spark/network/sasl/SparkSaslServer.java | 5 ++--
.../spark/network/shuffledb/LevelDBIterator.java | 4 +--
.../apache/spark/network/shuffledb/RocksDB.java | 7 +++--
.../spark/network/shuffledb/RocksDBIterator.java | 3 +--
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 +--
.../org/apache/spark/io/ReadAheadInputStream.java | 3 ++-
.../main/scala/org/apache/spark/TestUtils.scala | 2 +-
.../apache/spark/deploy/worker/DriverRunner.scala | 4 +--
.../spark/deploy/worker/ExecutorRunner.scala | 2 +-
.../spark/util/collection/AppendOnlyMap.scala | 2 +-
.../apache/spark/util/collection/OpenHashSet.scala | 2 +-
.../java/test/org/apache/spark/JavaAPISuite.java | 2 +-
.../test/scala/org/apache/spark/FileSuite.scala | 4 +--
.../scala/org/apache/spark/SparkContextSuite.scala | 31 ++++++++++++----------
.../deploy/history/EventLogFileReadersSuite.scala | 6 ++---
.../deploy/history/FsHistoryProviderSuite.scala | 3 ++-
.../history/HistoryServerArgumentsSuite.scala | 4 +--
.../spark/deploy/history/HistoryServerSuite.scala | 2 +-
.../internal/plugin/PluginContainerSuite.scala | 2 +-
.../resource/ResourceDiscoveryPluginSuite.scala | 2 +-
.../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 12 ++++-----
.../org/apache/spark/util/FileAppenderSuite.scala | 6 ++---
.../scala/org/apache/spark/util/UtilsSuite.scala | 6 ++---
.../streaming/JavaRecoverableNetworkWordCount.java | 4 ++-
.../streaming/RecoverableNetworkWordCount.scala | 5 ++--
.../ml/source/libsvm/JavaLibSVMRelationSuite.java | 2 +-
.../ml/source/libsvm/LibSVMRelationSuite.scala | 6 ++---
.../org/apache/spark/mllib/util/MLUtilsSuite.scala | 6 ++---
.../deploy/k8s/SparkKubernetesClientFactory.scala | 2 +-
.../k8s/features/HadoopConfDriverFeatureStep.scala | 2 +-
.../features/KerberosConfDriverFeatureStep.scala | 2 +-
.../k8s/features/PodTemplateConfigMapStep.scala | 2 +-
...iverKubernetesCredentialsFeatureStepSuite.scala | 2 +-
.../HadoopConfDriverFeatureStepSuite.scala | 2 +-
.../HadoopConfExecutorFeatureStepSuite.scala | 2 +-
.../KerberosConfDriverFeatureStepSuite.scala | 4 +--
.../k8s/integrationtest/DecommissionSuite.scala | 6 ++---
.../k8s/integrationtest/KubernetesSuite.scala | 2 +-
.../spark/deploy/yarn/BaseYarnClusterSuite.scala | 6 ++---
.../spark/deploy/yarn/YarnClusterSuite.scala | 29 ++++++++++----------
.../deploy/yarn/YarnShuffleIntegrationSuite.scala | 2 +-
.../sql/execution/arrow/ArrowConvertersSuite.scala | 6 ++---
.../thriftserver/HiveThriftServer2Suites.scala | 6 ++---
.../sql/hive/thriftserver/UISeleniumSuite.scala | 6 ++---
.../spark/sql/hive/execution/SQLQuerySuite.scala | 17 ++++++------
.../org/apache/spark/streaming/JavaAPISuite.java | 2 +-
.../apache/spark/streaming/CheckpointSuite.scala | 2 +-
.../apache/spark/streaming/InputStreamsSuite.scala | 10 +++----
.../apache/spark/streaming/MasterFailureTest.scala | 2 +-
58 files changed, 148 insertions(+), 145 deletions(-)
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 13a9d89f4705..7f8d6c58aec7 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -255,7 +255,8 @@ public class LevelDB implements KVStore {
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
};
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
index 69757fdc65d6..29ed37ffa44e 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
@@ -127,7 +127,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
try {
close();
} catch (IOException ioe) {
- throw Throwables.propagate(ioe);
+ throw new RuntimeException(ioe);
}
}
return next != null;
@@ -151,7 +151,8 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
next = null;
return ret;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
index dc7ad0be5c00..4bc2b233fe12 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
@@ -287,7 +287,8 @@ public class RocksDB implements KVStore {
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
};
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
index a98b0482e35c..e350ddc2d445 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
@@ -113,7 +113,7 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
try {
close();
} catch (IOException ioe) {
- throw Throwables.propagate(ioe);
+ throw new RuntimeException(ioe);
}
}
return next != null;
@@ -137,7 +137,8 @@ class RocksDBIterator<T> implements KVStoreIterator<T> {
next = null;
return ret;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 4c144a73a929..a9df47645d36 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -290,9 +290,11 @@ public class TransportClient implements Closeable {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
- throw Throwables.propagate(e.getCause());
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e1f19f956cc0..d64b8c8f838e 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -342,7 +342,8 @@ public class TransportClientFactory implements Closeable {
logger.error("Exception while bootstrapping client after {} ms", e,
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, bootstrapTimeMs));
client.close();
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
long postBootstrap = System.nanoTime();
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 08e2c084fe67..2e9ccd0e0ad2 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.concurrent.TimeoutException;
-import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@@ -80,7 +79,7 @@ public class AuthClientBootstrap implements
TransportClientBootstrap {
doSparkAuth(client, channel);
client.setClientId(appId);
} catch (GeneralSecurityException | IOException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
} catch (RuntimeException e) {
// There isn't a good exception that can be caught here to know whether
it's really
// OK to switch back to SASL (because the server doesn't speak the new
protocol). So
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 65367743e24f..087e3d21e22b 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -132,7 +132,8 @@ class AuthRpcHandler extends AbstractAuthRpcHandler {
try {
engine.close();
} catch (Exception e) {
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
}
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
index 3600c1045dbf..a61b1c3c0c41 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslClient.java
@@ -29,7 +29,6 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.internal.SparkLogger;
@@ -62,7 +61,7 @@ public class SparkSaslClient implements SaslEncryptionBackend
{
this.saslClient = Sasl.createSaslClient(new String[] { DIGEST }, null,
null, DEFAULT_REALM,
saslProps, new ClientCallbackHandler());
} catch (SaslException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -72,7 +71,7 @@ public class SparkSaslClient implements SaslEncryptionBackend
{
try {
return saslClient.evaluateChallenge(new byte[0]);
} catch (SaslException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
} else {
return new byte[0];
@@ -98,7 +97,7 @@ public class SparkSaslClient implements SaslEncryptionBackend
{
try {
return saslClient != null ? saslClient.evaluateChallenge(token) : new
byte[0];
} catch (SaslException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
index b897650afe83..f32fd5145c7c 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
@@ -31,7 +31,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -94,7 +93,7 @@ public class SparkSaslServer implements SaslEncryptionBackend
{
this.saslServer = Sasl.createSaslServer(DIGEST, null, DEFAULT_REALM,
saslProps,
new DigestCallbackHandler());
} catch (SaslException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -119,7 +118,7 @@ public class SparkSaslServer implements
SaslEncryptionBackend {
try {
return saslServer != null ? saslServer.evaluateResponse(token) : new
byte[0];
} catch (SaslException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java
b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java
index 5796e34a6f05..2ac549775449 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/LevelDBIterator.java
@@ -17,8 +17,6 @@
package org.apache.spark.network.shuffledb;
-import com.google.common.base.Throwables;
-
import java.io.IOException;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -47,7 +45,7 @@ public class LevelDBIterator implements DBIterator {
try {
close();
} catch (IOException ioe) {
- throw Throwables.propagate(ioe);
+ throw new RuntimeException(ioe);
}
}
return next != null;
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java
b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java
index d33895d6c2d6..2737ab8ed754 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDB.java
@@ -19,7 +19,6 @@ package org.apache.spark.network.shuffledb;
import java.io.IOException;
-import com.google.common.base.Throwables;
import org.rocksdb.RocksDBException;
/**
@@ -37,7 +36,7 @@ public class RocksDB implements DB {
try {
db.put(key, value);
} catch (RocksDBException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -46,7 +45,7 @@ public class RocksDB implements DB {
try {
return db.get(key);
} catch (RocksDBException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@@ -55,7 +54,7 @@ public class RocksDB implements DB {
try {
db.delete(key);
} catch (RocksDBException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
diff --git
a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
index 78562f91a4b7..829a7ded6330 100644
---
a/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
+++
b/common/network-common/src/main/java/org/apache/spark/network/shuffledb/RocksDBIterator.java
@@ -22,7 +22,6 @@ import java.util.AbstractMap;
import java.util.Map;
import java.util.NoSuchElementException;
-import com.google.common.base.Throwables;
import org.rocksdb.RocksIterator;
/**
@@ -52,7 +51,7 @@ public class RocksDBIterator implements DBIterator {
try {
close();
} catch (IOException ioe) {
- throw Throwables.propagate(ioe);
+ throw new RuntimeException(ioe);
}
}
return next != null;
diff --git
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7852bc814ccd..c3f02eebab23 100644
---
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -176,7 +176,7 @@ class KafkaTestUtils(
}
kdc.getKrb5conf.delete()
- Files.write(krb5confStr, kdc.getKrb5conf, StandardCharsets.UTF_8)
+ Files.asCharSink(kdc.getKrb5conf,
StandardCharsets.UTF_8).write(krb5confStr)
logDebug(s"krb5.conf file content: $krb5confStr")
}
@@ -240,7 +240,7 @@ class KafkaTestUtils(
| principal="$kafkaServerUser@$realm";
|};
""".stripMargin.trim
- Files.write(content, file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(content)
logDebug(s"Created JAAS file: ${file.getPath}")
logDebug(s"JAAS file content: $content")
file.getAbsolutePath()
diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
index 5e9f1b78273a..7dd87df713e6 100644
--- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
+++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
@@ -120,7 +120,8 @@ public class ReadAheadInputStream extends InputStream {
private void checkReadException() throws IOException {
if (readAborted) {
- Throwables.propagateIfPossible(readException, IOException.class);
+ Throwables.throwIfInstanceOf(readException, IOException.class);
+ Throwables.throwIfUnchecked(readException);
throw new IOException(readException);
}
}
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 5e3078d7292b..fed15a067c00 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -421,7 +421,7 @@ private[spark] object TestUtils extends SparkTestUtils {
def createTempScriptWithExpectedOutput(dir: File, prefix: String, output:
String): String = {
val file = File.createTempFile(prefix, ".sh", dir)
val script = s"cat <<EOF\n$output\nEOF\n"
- Files.write(script, file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(script)
JavaFiles.setPosixFilePermissions(file.toPath,
EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
file.getPath
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index bb96ecb38a64..ca0e024ad1ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import scala.jdk.CollectionConverters._
-import com.google.common.io.Files
+import com.google.common.io.{Files, FileWriteMode}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
@@ -216,7 +216,7 @@ private[deploy] class DriverRunner(
val redactedCommand = Utils.redactCommandLineArgs(conf,
builder.command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" *
40)
- Files.append(header, stderr, StandardCharsets.UTF_8)
+ Files.asCharSink(stderr, StandardCharsets.UTF_8,
FileWriteMode.APPEND).write(header)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 8d0fb7a54f72..d21904dd16ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -191,7 +191,7 @@ private[deploy] class ExecutorRunner(
stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)
val stderr = new File(executorDir, "stderr")
- Files.write(header, stderr, StandardCharsets.UTF_8)
+ Files.asCharSink(stderr, StandardCharsets.UTF_8).write(header)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
state = ExecutorState.RUNNING
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 46e311d8b047..ec43666898fa 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -208,7 +208,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/**
* Re-hash a value to deal better with hash functions that don't differ in
the lower bits.
*/
- private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
+ private def rehash(h: Int): Int =
Hashing.murmur3_32_fixed().hashInt(h).asInt()
/** Double the table's size and re-hash everything */
protected def growTable(): Unit = {
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index a42fa9ba6bc8..3d1eb5788c70 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -266,7 +266,7 @@ class OpenHashSet[@specialized(Long, Int, Double, Float) T:
ClassTag](
/**
* Re-hash a value to deal better with hash functions that don't differ in
the lower bits.
*/
- private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
+ private def hashcode(h: Int): Int =
Hashing.murmur3_32_fixed().hashInt(h).asInt()
private def nextPowerOf2(n: Int): Int = {
if (n == 0) {
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 11bd2b2a3312..802cb2667cc8 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -960,7 +960,7 @@ public class JavaAPISuite implements Serializable {
rdd.saveAsTextFile(outputDir);
// Read the plain text file and check it's OK
File outputFile = new File(outputDir, "part-00000");
- String content = Files.toString(outputFile, StandardCharsets.UTF_8);
+ String content = Files.asCharSource(outputFile,
StandardCharsets.UTF_8).read();
assertEquals("1\n2\n3\n4\n", content);
// Also try reading it in as a text file RDD
List<String> expected = Arrays.asList("1", "2", "3", "4");
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 5651dc9b2dbd..5f9912cbd021 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -334,8 +334,8 @@ class FileSuite extends SparkFunSuite with
LocalSparkContext {
for (i <- 0 until 8) {
val tempFile = new File(tempDir, s"part-0000$i")
- Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in
file1", tempFile,
- StandardCharsets.UTF_8)
+ Files.asCharSink(tempFile, StandardCharsets.UTF_8)
+ .write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1")
}
for (p <- Seq(1, 2, 8)) {
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 12f9d2f83c77..44b2da603a1f 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -119,8 +119,8 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
val absolutePath2 = file2.getAbsolutePath
try {
- Files.write("somewords1", file1, StandardCharsets.UTF_8)
- Files.write("somewords2", file2, StandardCharsets.UTF_8)
+ Files.asCharSink(file1, StandardCharsets.UTF_8).write("somewords1")
+ Files.asCharSink(file2, StandardCharsets.UTF_8).write("somewords2")
val length1 = file1.length()
val length2 = file2.length()
@@ -178,10 +178,10 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
s"${jarFile.getParent}/../${jarFile.getParentFile.getName}/${jarFile.getName}#zoo"
try {
- Files.write("somewords1", file1, StandardCharsets.UTF_8)
- Files.write("somewords22", file2, StandardCharsets.UTF_8)
- Files.write("somewords333", file3, StandardCharsets.UTF_8)
- Files.write("somewords4444", file4, StandardCharsets.UTF_8)
+ Files.asCharSink(file1, StandardCharsets.UTF_8).write("somewords1")
+ Files.asCharSink(file2, StandardCharsets.UTF_8).write("somewords22")
+ Files.asCharSink(file3, StandardCharsets.UTF_8).write("somewords333")
+ Files.asCharSink(file4, StandardCharsets.UTF_8).write("somewords4444")
val length1 = file1.length()
val length2 = file2.length()
val length3 = file1.length()
@@ -373,8 +373,8 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
assert(subdir2.mkdir())
val file1 = new File(subdir1, "file")
val file2 = new File(subdir2, "file")
- Files.write("old", file1, StandardCharsets.UTF_8)
- Files.write("new", file2, StandardCharsets.UTF_8)
+ Files.asCharSink(file1, StandardCharsets.UTF_8).write("old")
+ Files.asCharSink(file2, StandardCharsets.UTF_8).write("new")
sc = new SparkContext("local-cluster[1,1,1024]", "test")
sc.addFile(file1.getAbsolutePath)
def getAddedFileContents(): String = {
@@ -503,12 +503,15 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
try {
// Create 5 text files.
- Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in
file1", file1,
- StandardCharsets.UTF_8)
- Files.write("someline1 in file2\nsomeline2 in file2", file2,
StandardCharsets.UTF_8)
- Files.write("someline1 in file3", file3, StandardCharsets.UTF_8)
- Files.write("someline1 in file4\nsomeline2 in file4", file4,
StandardCharsets.UTF_8)
- Files.write("someline1 in file2\nsomeline2 in file5", file5,
StandardCharsets.UTF_8)
+ Files.asCharSink(file1, StandardCharsets.UTF_8)
+ .write("someline1 in file1\nsomeline2 in file1\nsomeline3 in
file1")
+ Files.asCharSink(file2, StandardCharsets.UTF_8)
+ .write("someline1 in file2\nsomeline2 in file2")
+ Files.asCharSink(file3, StandardCharsets.UTF_8).write("someline1 in
file3")
+ Files.asCharSink(file4, StandardCharsets.UTF_8)
+ .write("someline1 in file4\nsomeline2 in file4")
+ Files.asCharSink(file5, StandardCharsets.UTF_8)
+ .write("someline1 in file2\nsomeline2 in file5")
sc = new SparkContext(new
SparkConf().setAppName("test").setMaster("local"))
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
index f34f792881f9..7501a98a1a57 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
@@ -221,7 +221,7 @@ class SingleFileEventLogFileReaderSuite extends
EventLogFileReadersSuite {
val entry = is.getNextEntry
assert(entry != null)
val actual = new String(ByteStreams.toByteArray(is),
StandardCharsets.UTF_8)
- val expected = Files.toString(new File(logPath.toString),
StandardCharsets.UTF_8)
+ val expected = Files.asCharSource(new File(logPath.toString),
StandardCharsets.UTF_8).read()
assert(actual === expected)
assert(is.getNextEntry === null)
}
@@ -368,8 +368,8 @@ class RollingEventLogFilesReaderSuite extends
EventLogFileReadersSuite {
assert(allFileNames.contains(fileName))
val actual = new String(ByteStreams.toByteArray(is),
StandardCharsets.UTF_8)
- val expected = Files.toString(new File(logPath.toString, fileName),
- StandardCharsets.UTF_8)
+ val expected = Files.asCharSource(
+ new File(logPath.toString, fileName),
StandardCharsets.UTF_8).read()
assert(actual === expected)
}
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3013a5bf4a29..852f94bda870 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -708,7 +708,8 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite
with Matchers with P
while (entry != null) {
val actual = new String(ByteStreams.toByteArray(inputStream),
StandardCharsets.UTF_8)
val expected =
- Files.toString(logs.find(_.getName == entry.getName).get,
StandardCharsets.UTF_8)
+ Files.asCharSource(logs.find(_.getName == entry.getName).get,
StandardCharsets.UTF_8)
+ .read()
actual should be (expected)
totalEntries += 1
entry = inputStream.getNextEntry
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
index 2b9b110a4142..807e5ec3e823 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala
@@ -45,8 +45,8 @@ class HistoryServerArgumentsSuite extends SparkFunSuite {
test("Properties File Arguments Parsing --properties-file") {
withTempDir { tmpDir =>
val outFile = File.createTempFile("test-load-spark-properties", "test",
tmpDir)
- Files.write("spark.test.CustomPropertyA blah\n" +
- "spark.test.CustomPropertyB notblah\n", outFile, UTF_8)
+ Files.asCharSink(outFile, UTF_8).write("spark.test.CustomPropertyA
blah\n" +
+ "spark.test.CustomPropertyB notblah\n")
val argStrings = Array("--properties-file", outFile.getAbsolutePath)
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.test.CustomPropertyA") === "blah")
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index abb5ae720af0..6b2bd90cd431 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -283,7 +283,7 @@ abstract class HistoryServerSuite extends SparkFunSuite
with BeforeAndAfter with
val expectedFile = {
new File(logDir, entry.getName)
}
- val expected = Files.toString(expectedFile, StandardCharsets.UTF_8)
+ val expected = Files.asCharSource(expectedFile,
StandardCharsets.UTF_8).read()
val actual = new String(ByteStreams.toByteArray(zipStream),
StandardCharsets.UTF_8)
actual should be (expected)
filesCompared += 1
diff --git
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
index 79fa8d21bf3f..fc8f48df2cb7 100644
---
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -383,7 +383,7 @@ object NonLocalModeSparkPlugin {
resources: Map[String, ResourceInformation]): Unit = {
val path = conf.get(TEST_PATH_CONF)
val strToWrite = createFileStringWithGpuAddrs(id, resources)
- Files.write(strToWrite, new File(path, s"$filePrefix$id"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(path, s"$filePrefix$id"),
StandardCharsets.UTF_8).write(strToWrite)
}
def reset(): Unit = {
diff --git
a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
index ff7d68035217..edf138df9e20 100644
---
a/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
+++
b/core/src/test/scala/org/apache/spark/resource/ResourceDiscoveryPluginSuite.scala
@@ -148,7 +148,7 @@ object TestResourceDiscoveryPlugin {
def writeFile(conf: SparkConf, id: String): Unit = {
val path = conf.get(TEST_PATH_CONF)
val fileName = s"$id - ${UUID.randomUUID.toString}"
- Files.write(id, new File(path, fileName), StandardCharsets.UTF_8)
+ Files.asCharSink(new File(path, fileName),
StandardCharsets.UTF_8).write(id)
}
}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 3ef382573517..66b1ee7b58ac 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -868,23 +868,23 @@ abstract class RpcEnvSuite extends SparkFunSuite {
val conf = createSparkConf()
val file = new File(tempDir, "file")
- Files.write(UUID.randomUUID().toString(), file, UTF_8)
+ Files.asCharSink(file, UTF_8).write(UUID.randomUUID().toString)
val fileWithSpecialChars = new File(tempDir, "file name")
- Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
+ Files.asCharSink(fileWithSpecialChars,
UTF_8).write(UUID.randomUUID().toString)
val empty = new File(tempDir, "empty")
- Files.write("", empty, UTF_8);
+ Files.asCharSink(empty, UTF_8).write("")
val jar = new File(tempDir, "jar")
- Files.write(UUID.randomUUID().toString(), jar, UTF_8)
+ Files.asCharSink(jar, UTF_8).write(UUID.randomUUID().toString)
val dir1 = new File(tempDir, "dir1")
assert(dir1.mkdir())
val subFile1 = new File(dir1, "file1")
- Files.write(UUID.randomUUID().toString(), subFile1, UTF_8)
+ Files.asCharSink(subFile1, UTF_8).write(UUID.randomUUID().toString)
val dir2 = new File(tempDir, "dir2")
assert(dir2.mkdir())
val subFile2 = new File(dir2, "file2")
- Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)
+ Files.asCharSink(subFile2, UTF_8).write(UUID.randomUUID().toString)
val fileUri = env.fileServer.addFile(file)
val fileWithSpecialCharsUri =
env.fileServer.addFile(fileWithSpecialChars)
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 35ef0587b9b4..4497ea1b2b79 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -54,11 +54,11 @@ class FileAppenderSuite extends SparkFunSuite with
BeforeAndAfter {
val inputStream = new
ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8))
// The `header` should not be covered
val header = "Add header"
- Files.write(header, testFile, StandardCharsets.UTF_8)
+ Files.asCharSink(testFile, StandardCharsets.UTF_8).write(header)
val appender = new FileAppender(inputStream, testFile)
inputStream.close()
appender.awaitTermination()
- assert(Files.toString(testFile, StandardCharsets.UTF_8) === header +
testString)
+ assert(Files.asCharSource(testFile, StandardCharsets.UTF_8).read() ===
header + testString)
}
test("SPARK-35027: basic file appender - close stream") {
@@ -392,7 +392,7 @@ class FileAppenderSuite extends SparkFunSuite with
BeforeAndAfter {
IOUtils.closeQuietly(inputStream)
}
} else {
- Files.toString(file, StandardCharsets.UTF_8)
+ Files.asCharSource(file, StandardCharsets.UTF_8).read()
}
}.mkString("")
assert(allText === expectedText)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a694e08def89..a6e3345fc600 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -735,8 +735,8 @@ class UtilsSuite extends SparkFunSuite with
ResetSystemProperties {
withTempDir { tmpDir =>
val outFile = File.createTempFile("test-load-spark-properties", "test",
tmpDir)
System.setProperty("spark.test.fileNameLoadB", "2")
- Files.write("spark.test.fileNameLoadA true\n" +
- "spark.test.fileNameLoadB 1\n", outFile, UTF_8)
+ Files.asCharSink(outFile, UTF_8).write("spark.test.fileNameLoadA true\n"
+
+ "spark.test.fileNameLoadB 1\n")
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
properties
.filter { case (k, v) => k.startsWith("spark.")}
@@ -765,7 +765,7 @@ class UtilsSuite extends SparkFunSuite with
ResetSystemProperties {
val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath)
val sourceFile = File.createTempFile("someprefix", "somesuffix",
innerSourceDir)
val targetDir = new File(tempDir, "target-dir")
- Files.write("some text", sourceFile, UTF_8)
+ Files.asCharSink(sourceFile, UTF_8).write("some text")
val path =
if (Utils.isWindows) {
diff --git
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index 0c11c40cfe7e..1052f47ea496 100644
---
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
+import com.google.common.io.FileWriteMode;
import scala.Tuple2;
import com.google.common.io.Files;
@@ -152,7 +153,8 @@ public final class JavaRecoverableNetworkWordCount {
System.out.println(output);
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s)
totally");
System.out.println("Appending to " + outputFile.getAbsolutePath());
- Files.append(output + "\n", outputFile, Charset.defaultCharset());
+ Files.asCharSink(outputFile, Charset.defaultCharset(),
FileWriteMode.APPEND)
+ .write(output + "\n");
});
return ssc;
diff --git
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 98539d649423..1ec6ee4abd32 100644
---
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -21,7 +21,7 @@ package org.apache.spark.examples.streaming
import java.io.File
import java.nio.charset.Charset
-import com.google.common.io.Files
+import com.google.common.io.{Files, FileWriteMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
@@ -134,7 +134,8 @@ object RecoverableNetworkWordCount {
println(output)
println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
println(s"Appending to ${outputFile.getAbsolutePath}")
- Files.append(output + "\n", outputFile, Charset.defaultCharset())
+ Files.asCharSink(outputFile, Charset.defaultCharset(),
FileWriteMode.APPEND)
+ .write(output + "\n")
}
ssc
}
diff --git
a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java
b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java
index c3038fa9e1f8..5f0d22ea2a8a 100644
---
a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java
+++
b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java
@@ -50,7 +50,7 @@ public class JavaLibSVMRelationSuite extends
SharedSparkSession {
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"),
"datasource");
File file = new File(tempDir, "part-00000");
String s = "1 1:1.0 3:2.0 5:3.0\n0\n0 2:4.0 4:5.0 6:6.0";
- Files.write(s, file, StandardCharsets.UTF_8);
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(s);
path = tempDir.toURI().toString();
}
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index f2bb14561472..6a0d7b1237ee 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -65,9 +65,9 @@ class LibSVMRelationSuite
val succ = new File(dir, "_SUCCESS")
val file0 = new File(dir, "part-00000")
val file1 = new File(dir, "part-00001")
- Files.write("", succ, StandardCharsets.UTF_8)
- Files.write(lines0, file0, StandardCharsets.UTF_8)
- Files.write(lines1, file1, StandardCharsets.UTF_8)
+ Files.asCharSink(succ, StandardCharsets.UTF_8).write("")
+ Files.asCharSink(file0, StandardCharsets.UTF_8).write(lines0)
+ Files.asCharSink(file1, StandardCharsets.UTF_8).write(lines1)
path = dir.getPath
}
diff --git
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index a90c9c80d495..1a02e26b9260 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -93,7 +93,7 @@ class MLUtilsSuite extends SparkFunSuite with
MLlibTestSparkContext {
""".stripMargin
val tempDir = Utils.createTempDir()
val file = new File(tempDir.getPath, "part-00000")
- Files.write(lines, file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(lines)
val path = tempDir.toURI.toString
val pointsWithNumFeatures = loadLibSVMFile(sc, path, 6).collect()
@@ -126,7 +126,7 @@ class MLUtilsSuite extends SparkFunSuite with
MLlibTestSparkContext {
""".stripMargin
val tempDir = Utils.createTempDir()
val file = new File(tempDir.getPath, "part-00000")
- Files.write(lines, file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(lines)
val path = tempDir.toURI.toString
intercept[SparkException] {
@@ -143,7 +143,7 @@ class MLUtilsSuite extends SparkFunSuite with
MLlibTestSparkContext {
""".stripMargin
val tempDir = Utils.createTempDir()
val file = new File(tempDir.getPath, "part-00000")
- Files.write(lines, file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(lines)
val path = tempDir.toURI.toString
intercept[SparkException] {
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 79f76e96474e..2c28dc380046 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -107,7 +107,7 @@ object SparkKubernetesClientFactory extends Logging {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
- configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
+ configBuilder.withOauthToken(Files.asCharSource(file,
Charsets.UTF_8).read())
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
index e266d0f904e4..d64378a65d66 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
@@ -116,7 +116,7 @@ private[spark] class HadoopConfDriverFeatureStep(conf:
KubernetesConf)
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (confDir.isDefined) {
val fileMap = confFiles.map { file =>
- (file.getName(), Files.toString(file, StandardCharsets.UTF_8))
+ (file.getName(), Files.asCharSource(file,
StandardCharsets.UTF_8).read())
}.toMap.asJava
Seq(new ConfigMapBuilder()
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
index 82bda88892d0..89aefe47e46d 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -229,7 +229,7 @@ private[spark] class
KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
.endMetadata()
.withImmutable(true)
.addToData(
- Map(file.getName() -> Files.toString(file,
StandardCharsets.UTF_8)).asJava)
+ Map(file.getName() -> Files.asCharSource(file,
StandardCharsets.UTF_8).read()).asJava)
.build()
}
} ++ {
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
index cdc011229411..f94dad2d15dc 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
@@ -81,7 +81,7 @@ private[spark] class PodTemplateConfigMapStep(conf:
KubernetesConf)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf.sparkConf)
val uri = downloadFile(podTemplateFile, Utils.createTempDir(),
conf.sparkConf, hadoopConf)
val file = new java.net.URI(uri).getPath
- val podTemplateString = Files.toString(new File(file),
StandardCharsets.UTF_8)
+ val podTemplateString = Files.asCharSource(new File(file),
StandardCharsets.UTF_8).read()
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(configmapName)
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index f1dd8b94f17f..a72152a851c4 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -128,7 +128,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends
SparkFunSuite {
private def writeCredentials(credentialsFileName: String,
credentialsContents: String): File = {
val credentialsFile = new File(credentialsTempDirectory,
credentialsFileName)
- Files.write(credentialsContents, credentialsFile, Charsets.UTF_8)
+ Files.asCharSink(credentialsFile,
Charsets.UTF_8).write(credentialsContents)
credentialsFile
}
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
index 8f21b95236a9..4310ac0220e5 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
@@ -48,7 +48,7 @@ class HadoopConfDriverFeatureStepSuite extends SparkFunSuite {
val confFiles = Set("core-site.xml", "hdfs-site.xml")
confFiles.foreach { f =>
- Files.write("some data", new File(confDir, f), UTF_8)
+ Files.asCharSink(new File(confDir, f), UTF_8).write("some data")
}
val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR ->
confDir.getAbsolutePath()))
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
index a60227814eb1..04e20258d068 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStepSuite.scala
@@ -36,7 +36,7 @@ class HadoopConfExecutorFeatureStepSuite extends
SparkFunSuite {
val confFiles = Set("core-site.xml", "hdfs-site.xml")
confFiles.foreach { f =>
- Files.write("some data", new File(confDir, f), UTF_8)
+ Files.asCharSink(new File(confDir, f), UTF_8).write("some data")
}
Seq(
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
index 163d87643abd..b172bdc06ddc 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
@@ -55,7 +55,7 @@ class KerberosConfDriverFeatureStepSuite extends
SparkFunSuite {
test("create krb5.conf config map if local config provided") {
val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
- Files.write("some data", krbConf, UTF_8)
+ Files.asCharSink(krbConf, UTF_8).write("some data")
val sparkConf = new SparkConf(false)
.set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
@@ -70,7 +70,7 @@ class KerberosConfDriverFeatureStepSuite extends
SparkFunSuite {
test("create keytab secret if client keytab file used") {
val keytab = File.createTempFile("keytab", ".bin", tmpDir)
- Files.write("some data", keytab, UTF_8)
+ Files.asCharSink(keytab, UTF_8).write("some data")
val sparkConf = new SparkConf(false)
.set(KEYTAB, keytab.getAbsolutePath())
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index ae5f037c6b7d..950079dcb536 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -40,7 +40,7 @@ private[spark] trait DecommissionSuite { k8sSuite:
KubernetesSuite =>
val logConfFilePath = s"${sparkHomeDir.toFile}/conf/log4j2.properties"
try {
- Files.write(
+ Files.asCharSink(new File(logConfFilePath),
StandardCharsets.UTF_8).write(
"""rootLogger.level = info
|rootLogger.appenderRef.stdout.ref = console
|appender.console.type = Console
@@ -51,9 +51,7 @@ private[spark] trait DecommissionSuite { k8sSuite:
KubernetesSuite =>
|
|logger.spark.name = org.apache.spark
|logger.spark.level = debug
- """.stripMargin,
- new File(logConfFilePath),
- StandardCharsets.UTF_8)
+ """.stripMargin)
f()
} finally {
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 0b0b30e5e04f..cf129677ad9c 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -129,7 +129,7 @@ class KubernetesSuite extends SparkFunSuite
val tagFile = new File(path)
require(tagFile.isFile,
s"No file found for image tag at ${tagFile.getAbsolutePath}.")
- Files.toString(tagFile, Charsets.UTF_8).trim
+ Files.asCharSource(tagFile, Charsets.UTF_8).read().trim
}
.orElse(sys.props.get(CONFIG_KEY_IMAGE_TAG))
.getOrElse {
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index f0177541accc..e0dfac62847e 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -86,7 +86,7 @@ abstract class BaseYarnClusterSuite extends SparkFunSuite
with Matchers {
logConfDir.mkdir()
val logConfFile = new File(logConfDir, "log4j2.properties")
- Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8)
+ Files.asCharSink(logConfFile, StandardCharsets.UTF_8).write(LOG4J_CONF)
// Disable the disk utilization check to avoid the test hanging when
people's disks are
// getting full.
@@ -232,11 +232,11 @@ abstract class BaseYarnClusterSuite extends SparkFunSuite
with Matchers {
// an error message
val output = new Object() {
override def toString: String = outFile
- .map(Files.toString(_, StandardCharsets.UTF_8))
+ .map(Files.asCharSource(_, StandardCharsets.UTF_8).read())
.getOrElse("(stdout/stderr was not captured)")
}
assert(finalState === SparkAppHandle.State.FINISHED, output)
- val resultString = Files.toString(result, StandardCharsets.UTF_8)
+ val resultString = Files.asCharSource(result,
StandardCharsets.UTF_8).read()
assert(resultString === expected, output)
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 806efd39800f..92d9f2d62d1c 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -141,7 +141,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
| </property>
|</configuration>
|""".stripMargin
- Files.write(coreSite, new File(customConf, "core-site.xml"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(customConf, "core-site.xml"),
StandardCharsets.UTF_8).write(coreSite)
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(false,
@@ -295,23 +295,22 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
test("running Spark in yarn-cluster mode displays driver log links") {
val log4jConf = new File(tempDir, "log4j.properties")
val logOutFile = new File(tempDir, "logs")
- Files.write(
+ Files.asCharSink(log4jConf, StandardCharsets.UTF_8).write(
s"""rootLogger.level = debug
|rootLogger.appenderRef.file.ref = file
|appender.file.type = File
|appender.file.name = file
|appender.file.fileName = $logOutFile
|appender.file.layout.type = PatternLayout
- |""".stripMargin,
- log4jConf, StandardCharsets.UTF_8)
+ |""".stripMargin)
// Since this test is trying to extract log output from the SparkSubmit
process itself,
// standard options to the Spark process don't take effect. Leverage the
java-opts file which
// will get picked up for the SparkSubmit process.
val confDir = new File(tempDir, "conf")
confDir.mkdir()
val javaOptsFile = new File(confDir, "java-opts")
- Files.write(s"-Dlog4j.configurationFile=file://$log4jConf\n", javaOptsFile,
- StandardCharsets.UTF_8)
+ Files.asCharSink(javaOptsFile, StandardCharsets.UTF_8)
+ .write(s"-Dlog4j.configurationFile=file://$log4jConf\n")
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode = false,
@@ -320,7 +319,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
extraEnv = Map("SPARK_CONF_DIR" -> confDir.getAbsolutePath),
extraConf = Map(CLIENT_INCLUDE_DRIVER_LOGS_LINK.key -> true.toString))
checkResult(finalState, result)
- val logOutput = Files.toString(logOutFile, StandardCharsets.UTF_8)
+ val logOutput = Files.asCharSource(logOutFile,
StandardCharsets.UTF_8).read()
val logFilePattern = raw"""(?s).+\sDriver Logs \(<NAME>\):
https?://.+/<NAME>(\?\S+)?\s.+"""
logOutput should fullyMatch regex logFilePattern.replace("<NAME>",
"stdout")
logOutput should fullyMatch regex logFilePattern.replace("<NAME>",
"stderr")
@@ -374,7 +373,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
extraEnv: Map[String, String] = Map()): Unit = {
assume(isPythonAvailable)
val primaryPyFile = new File(tempDir, "test.py")
- Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)
+ Files.asCharSink(primaryPyFile, StandardCharsets.UTF_8).write(TEST_PYFILE)
// When running tests, let's not assume the user has built the assembly
module, which also
// creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH
to point at the
@@ -396,7 +395,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
subdir
}
val pyModule = new File(moduleDir, "mod1.py")
- Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8)
+ Files.asCharSink(pyModule, StandardCharsets.UTF_8).write(TEST_PYMODULE)
val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" ->
TEST_PYMODULE), moduleDir)
val pyFiles = Seq(pyModule.getAbsolutePath(),
mod2Archive.getPath()).mkString(",")
@@ -443,7 +442,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
def createEmptyIvySettingsFile: File = {
val emptyIvySettings = File.createTempFile("ivy", ".xml")
- Files.write("<ivysettings />", emptyIvySettings, StandardCharsets.UTF_8)
+ Files.asCharSink(emptyIvySettings,
StandardCharsets.UTF_8).write("<ivysettings />")
emptyIvySettings
}
@@ -555,7 +554,7 @@ private object YarnClusterDriverUseSparkHadoopUtilConf
extends Logging with Matc
}
result = "success"
} finally {
- Files.write(result, status, StandardCharsets.UTF_8)
+ Files.asCharSink(status, StandardCharsets.UTF_8).write(result)
sc.stop()
}
}
@@ -658,7 +657,7 @@ private object YarnClusterDriver extends Logging with
Matchers {
assert(driverAttributes === expectationAttributes)
}
} finally {
- Files.write(result, status, StandardCharsets.UTF_8)
+ Files.asCharSink(status, StandardCharsets.UTF_8).write(result)
sc.stop()
}
}
@@ -707,7 +706,7 @@ private object YarnClasspathTest extends Logging {
case t: Throwable =>
error(s"loading test.resource to $resultPath", t)
} finally {
- Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
+ Files.asCharSink(new File(resultPath),
StandardCharsets.UTF_8).write(result)
}
}
@@ -751,7 +750,7 @@ private object YarnAddJarTest extends Logging {
result = "success"
}
} finally {
- Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
+ Files.asCharSink(new File(resultPath),
StandardCharsets.UTF_8).write(result)
sc.stop()
}
}
@@ -796,7 +795,7 @@ private object ExecutorEnvTestApp {
executorEnvs.get(k).contains(v)
}
- Files.write(result.toString, new File(status), StandardCharsets.UTF_8)
+ Files.asCharSink(new File(status),
StandardCharsets.UTF_8).write(result.toString)
sc.stop()
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index f745265eddfd..f8d69c0ae568 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -181,7 +181,7 @@ private object YarnExternalShuffleDriver extends Logging
with Matchers {
if (execStateCopy != null) {
FileUtils.deleteDirectory(execStateCopy)
}
- Files.write(result, status, StandardCharsets.UTF_8)
+ Files.asCharSink(status, StandardCharsets.UTF_8).write(result)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
index 275b35947182..c90b1d3ca597 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
@@ -1217,8 +1217,8 @@ class ArrowConvertersSuite extends SharedSparkSession {
val tempFile1 = new File(tempDataPath, "testData2-ints-part1.json")
val tempFile2 = new File(tempDataPath, "testData2-ints-part2.json")
- Files.write(json1, tempFile1, StandardCharsets.UTF_8)
- Files.write(json2, tempFile2, StandardCharsets.UTF_8)
+ Files.asCharSink(tempFile1, StandardCharsets.UTF_8).write(json1)
+ Files.asCharSink(tempFile2, StandardCharsets.UTF_8).write(json2)
validateConversion(schema, arrowBatches(0), tempFile1)
validateConversion(schema, arrowBatches(1), tempFile2)
@@ -1501,7 +1501,7 @@ class ArrowConvertersSuite extends SharedSparkSession {
// NOTE: coalesce to single partition because can only load 1 batch in
validator
val batchBytes = df.coalesce(1).toArrowBatchRdd.collect().head
val tempFile = new File(tempDataPath, file)
- Files.write(json, tempFile, StandardCharsets.UTF_8)
+ Files.asCharSink(tempFile, StandardCharsets.UTF_8).write(json)
validateConversion(df.schema, batchBytes, tempFile, timeZoneId,
errorOnDuplicatedFieldNames)
}
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 4575549005f3..f1f0befcb0d3 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -1222,7 +1222,7 @@ abstract class HiveThriftServer2TestBase extends
SparkFunSuite with BeforeAndAft
// overrides all other potential log4j configurations contained in other
dependency jar files.
val tempLog4jConf = Utils.createTempDir().getCanonicalPath
- Files.write(
+ Files.asCharSink(new File(s"$tempLog4jConf/log4j2.properties"),
StandardCharsets.UTF_8).write(
"""rootLogger.level = info
|rootLogger.appenderRef.stdout.ref = console
|appender.console.type = Console
@@ -1230,9 +1230,7 @@ abstract class HiveThriftServer2TestBase extends
SparkFunSuite with BeforeAndAft
|appender.console.target = SYSTEM_ERR
|appender.console.layout.type = PatternLayout
|appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c:
%maxLen{%m}{512}%n%ex{8}%n
- """.stripMargin,
- new File(s"$tempLog4jConf/log4j2.properties"),
- StandardCharsets.UTF_8)
+ """.stripMargin)
tempLog4jConf
}
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
index 2b2cbec41d64..8d4a9886a2b2 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -75,7 +75,7 @@ class UISeleniumSuite
// overrides all other potential log4j configurations contained in other
dependency jar files.
val tempLog4jConf =
org.apache.spark.util.Utils.createTempDir().getCanonicalPath
- Files.write(
+ Files.asCharSink(new File(s"$tempLog4jConf/log4j2.properties"),
StandardCharsets.UTF_8).write(
"""rootLogger.level = info
|rootLogger.appenderRef.file.ref = console
|appender.console.type = Console
@@ -83,9 +83,7 @@ class UISeleniumSuite
|appender.console.target = SYSTEM_ERR
|appender.console.layout.type = PatternLayout
|appender.console.layout.pattern = %d{HH:mm:ss.SSS} %p %c:
%maxLen{%m}{512}%n%ex{8}%n
- """.stripMargin,
- new File(s"$tempLog4jConf/log4j2.properties"),
- StandardCharsets.UTF_8)
+ """.stripMargin)
tempLog4jConf
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 14051034a588..1c45b02375b3 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.{Locale, Set}
-import com.google.common.io.Files
+import com.google.common.io.{Files, FileWriteMode}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkException, TestUtils}
@@ -1947,10 +1947,10 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
for (i <- 1 to 3) {
- Files.write(s"$i", new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8).write(s"$i")
}
for (i <- 5 to 7) {
- Files.write(s"$i", new File(dirPath, s"part-s-0000$i"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(dirPath, s"part-s-0000$i"),
StandardCharsets.UTF_8).write(s"$i")
}
withTable("load_t") {
@@ -1971,7 +1971,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
for (i <- 1 to 3) {
- Files.write(s"$i", new File(dirPath, s"part-r-0000 $i"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(dirPath, s"part-r-0000 $i"),
StandardCharsets.UTF_8).write(s"$i")
}
withTable("load_t") {
sql("CREATE TABLE load_t (a STRING) USING hive")
@@ -1986,7 +1986,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
for (i <- 1 to 3) {
- Files.write(s"$i", new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8).write(s"$i")
}
withTable("load_t") {
sql("CREATE TABLE load_t (a STRING) USING hive")
@@ -2010,7 +2010,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
for (i <- 1 to 3) {
- Files.write(s"$i", new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8).write(s"$i")
}
withTable("load_t1") {
sql("CREATE TABLE load_t1 (a STRING) USING hive")
@@ -2025,7 +2025,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
for (i <- 1 to 3) {
- Files.write(s"$i", new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8)
+ Files.asCharSink(new File(dirPath, s"part-r-0000$i"),
StandardCharsets.UTF_8).write(s"$i")
}
withTable("load_t2") {
sql("CREATE TABLE load_t2 (a STRING) USING hive")
@@ -2039,7 +2039,8 @@ abstract class SQLQuerySuiteBase extends QueryTest with
SQLTestUtils with TestHi
withTempDir { dir =>
val path = dir.toURI.toString.stripSuffix("/")
val dirPath = dir.getAbsoluteFile
- Files.append("1", new File(dirPath, "part-r-000011"),
StandardCharsets.UTF_8)
+ Files.asCharSink(
+ new File(dirPath, "part-r-000011"), StandardCharsets.UTF_8,
FileWriteMode.APPEND).write("1")
withTable("part_table") {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
diff --git
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index f8d961fa8dd8..73c2e89f3729 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -1641,7 +1641,7 @@ public class JavaAPISuite extends
LocalJavaStreamingContext implements Serializa
private static List<List<String>> fileTestPrepare(File testDir) throws
IOException {
File existingFile = new File(testDir, "0");
- Files.write("0\n", existingFile, StandardCharsets.UTF_8);
+ Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n");
Assertions.assertTrue(existingFile.setLastModified(1000));
Assertions.assertEquals(1000, existingFile.lastModified());
return Arrays.asList(Arrays.asList("0"));
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 43b0835df7cb..4aeb0e043a97 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -649,7 +649,7 @@ class CheckpointSuite extends TestSuiteBase with
LocalStreamingContext with DStr
*/
def writeFile(i: Int, clock: Clock): Unit = {
val file = new File(testDir, i.toString)
- Files.write(s"$i\n", file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(s"$i\n")
assert(file.setLastModified(clock.getTimeMillis()))
// Check that the file's modification date is actually the value we
wrote, since rounding or
// truncation will break the test:
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 66fd1ac7bb22..64335a96045b 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -132,7 +132,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
val batchDuration = Seconds(2)
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
- Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+ Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n")
assert(existingFile.setLastModified(10000) && existingFile.lastModified
=== 10000)
// Set up the streaming context and input streams
@@ -191,7 +191,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
- Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+ Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n")
assert(existingFile.setLastModified(10000) && existingFile.lastModified
=== 10000)
val pathWithWildCard = testDir.toString + "/*/"
@@ -215,7 +215,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
- Files.write(s"$data\n", file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(s"$data\n")
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo(s"Created file $file")
@@ -478,7 +478,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
val batchDuration = Seconds(2)
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
- Files.write("0\n", existingFile, StandardCharsets.UTF_8)
+ Files.asCharSink(existingFile, StandardCharsets.UTF_8).write("0\n")
assert(existingFile.setLastModified(10000) && existingFile.lastModified
=== 10000)
// Set up the streaming context and input streams
@@ -502,7 +502,7 @@ class InputStreamsSuite extends TestSuiteBase with
BeforeAndAfter {
val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
val file = new File(testDir, i.toString)
- Files.write(s"$i\n", file, StandardCharsets.UTF_8)
+ Files.asCharSink(file, StandardCharsets.UTF_8).write(s"$i\n")
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 771e65ed40b5..2dc43a231d9b 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -375,7 +375,7 @@ class FileGeneratingThread(input: Seq[String], testDir:
Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
- Files.write(input(i) + "\n", localFile, StandardCharsets.UTF_8)
+ Files.asCharSink(localFile, StandardCharsets.UTF_8).write(input(i) +
"\n")
var tries = 0
var done = false
while (!done && tries < maxTries) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]