This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 097114811 [#1818] fix(spark3): Avoid calling
RssShuffleDataIterator.cleanup multiple times (#1819)
097114811 is described below
commit 097114811620a68254929133e8d22a0250681239
Author: Zhen Wang <[email protected]>
AuthorDate: Tue Jun 25 10:09:30 2024 +0800
[#1818] fix(spark3): Avoid calling RssShuffleDataIterator.cleanup multiple
times (#1819)
### What changes were proposed in this pull request?
Avoid calling `RssShuffleDataIterator.cleanup` multiple times.
### Why are the changes needed?
Fix: #1818
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../org/apache/spark/shuffle/FunctionUtils.java | 39 ++++++++++++++++++++++
.../spark/shuffle/reader/RssShuffleReader.java | 10 +++---
.../apache/spark/shuffle/FunctionUtilsTests.java | 39 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 4 deletions(-)
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/FunctionUtils.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/FunctionUtils.java
new file mode 100644
index 000000000..9ed95e01d
--- /dev/null
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/FunctionUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import scala.Function0;
+
+public class FunctionUtils {
+
+ public static <T> Function0<T> once(Function0<T> f) {
+ return new Function0<T>() {
+ private volatile T value;
+ private volatile boolean computed = false;
+
+ @Override
+ public T apply() {
+ if (!computed) {
+ computed = true;
+ value = f.apply();
+ }
+ return value;
+ }
+ };
+ }
+}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index d76abc4f7..bf47ced6b 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -39,6 +39,7 @@ import org.apache.spark.ShuffleDependency;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.serializer.Serializer;
+import org.apache.spark.shuffle.FunctionUtils;
import org.apache.spark.shuffle.RssShuffleHandle;
import org.apache.spark.shuffle.ShuffleReader;
import org.apache.spark.util.CompletionIterator;
@@ -285,10 +286,11 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
CompletionIterator<Product2<K, C>, RssShuffleDataIterator<K, C>>
completionIterator =
CompletionIterator$.MODULE$.apply(
iterator,
- () -> {
- context.taskMetrics().mergeShuffleReadMetrics();
- return iterator.cleanup();
- });
+ FunctionUtils.once(
+ () -> {
+ context.taskMetrics().mergeShuffleReadMetrics();
+ return iterator.cleanup();
+ }));
iterators.add(completionIterator);
}
iterator = iterators.iterator();
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/FunctionUtilsTests.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/FunctionUtilsTests.java
new file mode 100644
index 000000000..7640f504e
--- /dev/null
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/FunctionUtilsTests.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import scala.Function0;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FunctionUtilsTests {
+
+ @Test
+ public void testOnceFunction0() {
+ AtomicInteger count = new AtomicInteger(0);
+ Function0<Integer> once = FunctionUtils.once(count::incrementAndGet);
+ once.apply();
+ once.apply();
+ once.apply();
+ assertEquals(1, count.get());
+ }
+}