This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 093fbf1aa852 [SPARK-45831][CORE][SQL][DSTREAM] Use collection factory
instead to create immutable Java collections
093fbf1aa852 is described below
commit 093fbf1aa8520193b8d929f9f855afe0aded20a1
Author: yangjie01 <[email protected]>
AuthorDate: Wed Nov 8 19:23:29 2023 -0800
[SPARK-45831][CORE][SQL][DSTREAM] Use collection factory instead to create
immutable Java collections
### What changes were proposed in this pull request?
This pr change to use collection factory instread of
`Collections.unmodifiable` to create an immutable Java collection(new
collection API introduced after [JEP 269](https://openjdk.org/jeps/269))
### Why are the changes needed?
Make the relevant code look simple and clear.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43709 from LuciferYang/collection-factory.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/network/util/JavaUtils.java | 44 +++++++++++-----------
.../main/scala/org/apache/spark/FutureAction.scala | 5 +--
.../org/apache/spark/util/AccumulatorV2.scala | 3 +-
.../parquet/SpecificParquetRecordReaderBase.java | 5 +--
.../org/apache/spark/streaming/JavaAPISuite.java | 4 +-
5 files changed, 26 insertions(+), 35 deletions(-)
diff --git
a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
index bbe764b8366c..fa0a2629f350 100644
--- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -202,29 +202,27 @@ public class JavaUtils {
private static final Map<String, ByteUnit> byteSuffixes;
static {
- final Map<String, TimeUnit> timeSuffixesBuilder = new HashMap<>();
- timeSuffixesBuilder.put("us", TimeUnit.MICROSECONDS);
- timeSuffixesBuilder.put("ms", TimeUnit.MILLISECONDS);
- timeSuffixesBuilder.put("s", TimeUnit.SECONDS);
- timeSuffixesBuilder.put("m", TimeUnit.MINUTES);
- timeSuffixesBuilder.put("min", TimeUnit.MINUTES);
- timeSuffixesBuilder.put("h", TimeUnit.HOURS);
- timeSuffixesBuilder.put("d", TimeUnit.DAYS);
- timeSuffixes = Collections.unmodifiableMap(timeSuffixesBuilder);
-
- final Map<String, ByteUnit> byteSuffixesBuilder = new HashMap<>();
- byteSuffixesBuilder.put("b", ByteUnit.BYTE);
- byteSuffixesBuilder.put("k", ByteUnit.KiB);
- byteSuffixesBuilder.put("kb", ByteUnit.KiB);
- byteSuffixesBuilder.put("m", ByteUnit.MiB);
- byteSuffixesBuilder.put("mb", ByteUnit.MiB);
- byteSuffixesBuilder.put("g", ByteUnit.GiB);
- byteSuffixesBuilder.put("gb", ByteUnit.GiB);
- byteSuffixesBuilder.put("t", ByteUnit.TiB);
- byteSuffixesBuilder.put("tb", ByteUnit.TiB);
- byteSuffixesBuilder.put("p", ByteUnit.PiB);
- byteSuffixesBuilder.put("pb", ByteUnit.PiB);
- byteSuffixes = Collections.unmodifiableMap(byteSuffixesBuilder);
+ timeSuffixes = Map.of(
+ "us", TimeUnit.MICROSECONDS,
+ "ms", TimeUnit.MILLISECONDS,
+ "s", TimeUnit.SECONDS,
+ "m", TimeUnit.MINUTES,
+ "min", TimeUnit.MINUTES,
+ "h", TimeUnit.HOURS,
+ "d", TimeUnit.DAYS);
+
+ byteSuffixes = Map.ofEntries(
+ Map.entry("b", ByteUnit.BYTE),
+ Map.entry("k", ByteUnit.KiB),
+ Map.entry("kb", ByteUnit.KiB),
+ Map.entry("m", ByteUnit.MiB),
+ Map.entry("mb", ByteUnit.MiB),
+ Map.entry("g", ByteUnit.GiB),
+ Map.entry("gb", ByteUnit.GiB),
+ Map.entry("t", ByteUnit.TiB),
+ Map.entry("tb", ByteUnit.TiB),
+ Map.entry("p", ByteUnit.PiB),
+ Map.entry("pb", ByteUnit.PiB));
}
/**
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala
b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 9100d4ce041b..a68700421b8d 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -17,7 +17,6 @@
package org.apache.spark
-import java.util.Collections
import java.util.concurrent.TimeUnit
import scala.concurrent._
@@ -255,8 +254,6 @@ private[spark]
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter:
S => T)
extends JavaFutureAction[T] {
- import scala.jdk.CollectionConverters._
-
override def isCancelled: Boolean = futureAction.isCancelled
override def isDone: Boolean = {
@@ -266,7 +263,7 @@ class JavaFutureActionWrapper[S, T](futureAction:
FutureAction[S], converter: S
}
override def jobIds(): java.util.List[java.lang.Integer] = {
-
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
+ java.util.List.of(futureAction.jobIds.map(Integer.valueOf): _*)
}
private def getImpl(timeout: Duration): T = {
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 181033c9d20c..c6d8073a0c2f 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util
import java.{lang => jl}
import java.io.ObjectInputStream
-import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
@@ -505,7 +504,7 @@ class CollectionAccumulator[T] extends AccumulatorV2[T,
java.util.List[T]] {
}
override def value: java.util.List[T] = this.synchronized {
- java.util.Collections.unmodifiableList(new ArrayList[T](getOrCreate))
+ java.util.List.copyOf(getOrCreate)
}
private[spark] def setValue(newValue: java.util.List[T]): Unit =
this.synchronized {
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 4f2b65f36120..6d00048154a5 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -238,9 +237,7 @@ public abstract class SpecificParquetRecordReaderBase<T>
extends RecordReader<Vo
private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
Map<K, Set<V>> setMultiMap = new HashMap<>();
for (Map.Entry<K, V> entry : map.entrySet()) {
- Set<V> set = new HashSet<>();
- set.add(entry.getValue());
- setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+ setMultiMap.put(entry.getKey(), Set.of(entry.getValue()));
}
return Collections.unmodifiableMap(setMultiMap);
}
diff --git
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index b1f743b92196..f8d961fa8dd8 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -704,11 +704,11 @@ public class JavaAPISuite extends
LocalJavaStreamingContext implements Serializa
List<List<T>> expected, List<List<T>> actual) {
List<Set<T>> expectedSets = new ArrayList<>();
for (List<T> list: expected) {
- expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
+ expectedSets.add(Set.copyOf(list));
}
List<Set<T>> actualSets = new ArrayList<>();
for (List<T> list: actual) {
- actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
+ actualSets.add(Set.copyOf(list));
}
Assertions.assertEquals(expectedSets, actualSets);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]