This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 15ae6787ec4 [HUDI-9219] Add callbacks for CloseableIterators and eager
close data readers (#13178)
15ae6787ec4 is described below
commit 15ae6787ec4fa6ddad0d30c0c81e81b4a41b477e
Author: Tim Brown <[email protected]>
AuthorDate: Mon Apr 28 23:13:50 2025 -0500
[HUDI-9219] Add callbacks for CloseableIterators and eager close data
readers (#13178)
---
.../MultipleSparkJobExecutionStrategy.java | 7 +-
.../hudi/data/CloseableIteratorListener.java | 66 +++++++++++
.../org/apache/hudi/data/HoodieJavaPairRDD.java | 2 +-
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 6 +-
.../apache/hudi/data/TestHoodieJavaPairRDD.java | 12 ++
.../org/apache/hudi/data/TestHoodieJavaRDD.java | 36 ++++++
.../hudi/data/TrackingCloseableIterator.java | 62 +++++++++++
.../hudi/common/data/HoodieBaseListData.java | 35 +++++-
.../apache/hudi/common/data/HoodieListData.java | 20 ++--
.../hudi/common/data/HoodieListPairData.java | 124 +++++++++++----------
.../common/table/read/HoodieFileGroupReader.java | 14 ++-
.../io/storage/HoodieAvroHFileReaderImplBase.java | 10 +-
.../hudi/common/data/CloseValidationIterator.java | 56 ++++++++++
.../hudi/common/data/TestHoodieListData.java | 38 +++++++
.../common/data/TestHoodieListDataPairData.java | 46 ++++++++
.../hadoop/TestHoodieHBaseHFileReaderWriter.java | 17 +--
.../io/hadoop/TestHoodieHFileReaderWriter.java | 17 +--
.../hudi/io/hadoop/TestOrcReaderIterator.java | 49 ++++----
.../datasources/HoodieMultipleBaseFileFormat.scala | 4 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 5 +-
.../procedures/PartitionBucketIndexManager.scala | 2 +
21 files changed, 503 insertions(+), 125 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 147c3c2b687..f57367f502d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.CloseableIteratorListener;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
@@ -335,7 +336,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
};
suppliers.add(iteratorSupplier);
});
- return new LazyConcatenatingIterator<>(suppliers);
+ return CloseableIteratorListener.addListener(new
LazyConcatenatingIterator<>(suppliers));
}));
}
@@ -357,7 +358,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
iteratorGettersForPartition.add(recordIteratorGetter);
});
- return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
+ return CloseableIteratorListener.addListener(new
LazyConcatenatingIterator<>(iteratorGettersForPartition));
}));
}
@@ -477,7 +478,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
0, Long.MAX_VALUE, usePosition, false);
fileGroupReader.initRecordIterators();
// read records from the FG reader
- return fileGroupReader.getClosableIterator();
+ return
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator());
}
}).rdd();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java
new file mode 100644
index 00000000000..c018aeccfa3
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.spark.TaskContext;
+import org.apache.spark.util.TaskCompletionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+/**
+ * Helper class for adding a spark task completion listener that will ensure
the iterator is closed if it is an instance of {@link AutoCloseable}.
+ * This is commonly used with {@link
org.apache.hudi.common.util.collection.ClosableIterator} to ensure the
resources are closed after the task completes.
+ */
+public class CloseableIteratorListener implements TaskCompletionListener {
+ private static final Logger LOG =
LoggerFactory.getLogger(CloseableIteratorListener.class);
+ private final Object iterator;
+
+ private CloseableIteratorListener(Object iterator) {
+ this.iterator = iterator;
+ }
+
+ public static <T> Iterator<T> addListener(Iterator<T> iterator) {
+ TaskContext.get().addTaskCompletionListener(new
CloseableIteratorListener(iterator));
+ return iterator;
+ }
+
+ public static <T> scala.collection.Iterator<T>
addListener(scala.collection.Iterator<T> iterator) {
+ TaskContext.get().addTaskCompletionListener(new
CloseableIteratorListener(iterator));
+ return iterator;
+ }
+
+ /**
+ * Closes the iterator if it also implements {@link AutoCloseable},
otherwise it is a no-op.
+ *
+ * @param context the spark context
+ */
+ @Override
+ public void onTaskCompletion(TaskContext context) {
+ if (iterator instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) iterator).close();
+ } catch (Exception ex) {
+ LOG.warn("Failed to properly close iterator", ex);
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index fbcab6b575e..2dbbb1880bf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -131,7 +131,7 @@ public class HoodieJavaPairRDD<K, V> implements
HoodiePairData<K, V> {
}
public <W> HoodiePairData<K, W> flatMapValues(SerializableFunction<V,
Iterator<W>> func) {
- return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(func::apply));
+ return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(iter ->
CloseableIteratorListener.addListener(func.apply(iter))));
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index faec42368ca..0eca77693db 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -150,21 +150,21 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
@Override
public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
Iterator<O>> func, boolean preservesPartitioning) {
- return HoodieJavaRDD.of(rddData.mapPartitions(func::apply,
preservesPartitioning));
+ return HoodieJavaRDD.of(rddData.mapPartitions(iter ->
CloseableIteratorListener.addListener(func.apply(iter)),
preservesPartitioning));
}
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
// NOTE: Unrolling this lambda into a method reference results in
[[ClassCastException]]
// due to weird interop b/w Scala and Java
- return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
+ return HoodieJavaRDD.of(rddData.flatMap(e ->
CloseableIteratorListener.addListener(func.apply(e))));
}
@Override
public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T,
Iterator<? extends Pair<K, V>>> func) {
return HoodieJavaPairRDD.of(
rddData.flatMapToPair(e ->
- new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(),
p.getValue()))));
+ new
MappingIterator<>(CloseableIteratorListener.addListener(func.apply(e)), p ->
new Tuple2<>(p.getKey(), p.getValue()))));
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
index 75bc888a71d..3c11b1279e4 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import scala.Tuple2;
@@ -107,4 +108,15 @@ public class TestHoodieJavaPairRDD {
assertEquals(Option.of("value1"), item.getRight().getRight());
});
}
+
+ @Test
+ void testFlatMapValuesWithCloseable() {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ HoodiePairData<Integer, String> input =
HoodieJavaPairRDD.of(jsc.parallelizePairs(Arrays.asList(Tuple2.apply(1,
partition1), Tuple2.apply(2, partition2)), 2));
+ input.flatMapValues(partition -> new
TrackingCloseableIterator<>(partition, Collections.singletonList(1).iterator()))
+ .collectAsList();
+ assertTrue(TrackingCloseableIterator.isClosed(partition1));
+ assertTrue(TrackingCloseableIterator.isClosed(partition2));
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
index a2617b592d6..cbaf7fa604d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -27,10 +27,13 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieJavaRDD extends HoodieClientTestBase {
@Test
@@ -65,4 +68,37 @@ public class TestHoodieJavaRDD extends HoodieClientTestBase {
.reduceByKey((p1, p2) -> p1, 11);
assertEquals(11, shuffleRDD.deduceNumPartitions());
}
+
+ @Test
+ void testMapPartitionsWithCloseable() {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1,
partition2), context, 2);
+ input.mapPartitions(partition -> new
TrackingCloseableIterator<>(partition.next(),
Collections.singletonList("a").iterator()), true)
+ .collectAsList();
+ assertTrue(TrackingCloseableIterator.isClosed(partition1));
+ assertTrue(TrackingCloseableIterator.isClosed(partition2));
+ }
+
+ @Test
+ void testFlatMapWithCloseable() {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1,
partition2), context, 2);
+ input.flatMap(partition -> new TrackingCloseableIterator<>(partition,
Collections.singletonList("a").iterator()))
+ .collectAsList();
+ assertTrue(TrackingCloseableIterator.isClosed(partition1));
+ assertTrue(TrackingCloseableIterator.isClosed(partition2));
+ }
+
+ @Test
+ void testFlatMapToPairWithCloseable() {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1,
partition2), context, 2);
+ input.flatMapToPair(partition -> new
TrackingCloseableIterator<>(partition, Collections.singletonList(Pair.of(1,
"1")).iterator()))
+ .collectAsList();
+ assertTrue(TrackingCloseableIterator.isClosed(partition1));
+ assertTrue(TrackingCloseableIterator.isClosed(partition2));
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java
new file mode 100644
index 00000000000..d06ec7766f3
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Closeable iterator to use in Spark related testing to ensure that the close
method is properly called after transformations.
+ * @param <T> type of record within the iterator
+ */
+class TrackingCloseableIterator<T> implements ClosableIterator<T>,
Serializable {
+ private static final Map<String, Boolean> IS_CLOSED_BY_ID = new HashMap<>();
+ private final String id;
+ private final Iterator<T> inner;
+
+ public TrackingCloseableIterator(String id, Iterator<T> inner) {
+ this.id = id;
+ this.inner = inner;
+ IS_CLOSED_BY_ID.put(id, false);
+ }
+
+ public static boolean isClosed(String id) {
+ return IS_CLOSED_BY_ID.get(id);
+ }
+
+ @Override
+ public void close() {
+ IS_CLOSED_BY_ID.put(id, true);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return inner.next();
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
index 6f3dbfcef99..b603b99d930 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
@@ -21,6 +21,10 @@ package org.apache.hudi.common.data;
import org.apache.hudi.common.util.Either;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -43,7 +47,12 @@ public abstract class HoodieBaseListData<T> {
protected HoodieBaseListData(Stream<T> dataStream, boolean lazy) {
// NOTE: In case this container is being instantiated by an eager parent,
we have to
// pre-materialize the stream
- this.data = lazy ? Either.left(dataStream) :
Either.right(dataStream.collect(Collectors.toList()));
+ if (lazy) {
+ this.data = Either.left(dataStream);
+ } else {
+ this.data = Either.right(dataStream.collect(Collectors.toList()));
+ dataStream.close();
+ }
this.lazy = lazy;
}
@@ -69,9 +78,31 @@ public abstract class HoodieBaseListData<T> {
protected List<T> collectAsList() {
if (lazy) {
- return data.asLeft().collect(Collectors.toList());
+ try (Stream<T> stream = data.asLeft()) {
+ return stream.collect(Collectors.toList());
+ }
} else {
return data.asRight();
}
}
+
+ static class IteratorCloser implements Runnable {
+ private static final Logger LOG =
LoggerFactory.getLogger(IteratorCloser.class);
+ private final Iterator<?> iterator;
+
+ IteratorCloser(Iterator<?> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public void run() {
+ if (iterator instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) iterator).close();
+ } catch (Exception ex) {
+ LOG.warn("Failed to properly close iterator", ex);
+ }
+ }
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 690ab71c090..5eebf2a2401 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -119,10 +119,12 @@ public class HoodieListData<T> extends
HoodieBaseListData<T> implements HoodieDa
@Override
public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>,
Iterator<O>> func, boolean preservesPartitioning) {
Function<Iterator<T>, Iterator<O>> mapper = throwingMapWrapper(func);
+ Iterator<T> iterator = asStream().iterator();
+ Iterator<O> newIterator = mapper.apply(iterator);
return new HoodieListData<>(
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
- mapper.apply(asStream().iterator()), Spliterator.ORDERED),
true),
+ newIterator, Spliterator.ORDERED), true).onClose(new
IteratorCloser(newIterator)),
lazy
);
}
@@ -130,18 +132,22 @@ public class HoodieListData<T> extends
HoodieBaseListData<T> implements HoodieDa
@Override
public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
Function<T, Iterator<O>> mapper = throwingMapWrapper(func);
- Stream<O> mappedStream = asStream().flatMap(e ->
- StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(mapper.apply(e),
Spliterator.ORDERED), true));
+ Stream<O> mappedStream = asStream().flatMap(e -> {
+ Iterator<O> iterator = mapper.apply(e);
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
true).onClose(new IteratorCloser(iterator));
+ });
return new HoodieListData<>(mappedStream, lazy);
}
@Override
public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T,
Iterator<? extends Pair<K, V>>> func) {
Function<T, Iterator<? extends Pair<K, V>>> mapper =
throwingMapWrapper(func);
- Stream<Pair<K, V>> mappedStream = asStream().flatMap(e ->
- StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(mapper.apply(e),
Spliterator.ORDERED), true));
+ Stream<Pair<K, V>> mappedStream = asStream().flatMap(e -> {
+ Iterator<? extends Pair<K, V>> iterator = mapper.apply(e);
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
true).onClose(new IteratorCloser(iterator));
+ });
return new HoodieListPairData<>(mappedStream, lazy);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
index ebf7207c84e..dd5bb224962 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -105,7 +105,9 @@ public class HoodieListPairData<K, V> extends
HoodieBaseListData<Pair<K, V>> imp
@Override
public Map<K, Long> countByKey() {
- return asStream().collect(Collectors.groupingBy(Pair::getKey,
Collectors.counting()));
+ try (Stream<Pair<K, V>> stream = asStream()) {
+ return stream.collect(Collectors.groupingBy(Pair::getKey,
Collectors.counting()));
+ }
}
@Override
@@ -114,27 +116,31 @@ public class HoodieListPairData<K, V> extends
HoodieBaseListData<Pair<K, V>> imp
Collector<Pair<K, V>, ?, Map<K, List<V>>> groupingCollector =
Collectors.groupingBy(Pair::getKey, mappingCollector);
- Map<K, List<V>> groupedByKey = asStream().collect(groupingCollector);
- return new HoodieListPairData<>(
- groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(),
e.getValue())),
- lazy
- );
+ try (Stream<Pair<K, V>> s = asStream()) {
+ Map<K, List<V>> groupedByKey = s.collect(groupingCollector);
+ return new HoodieListPairData<>(
+ groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(),
e.getValue())),
+ lazy
+ );
+ }
}
@Override
public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V>
combiner, int parallelism) {
- Map<K, java.util.Optional<V>> reducedMap = asStream().collect(
- Collectors.groupingBy(
- Pair::getKey,
- HashMap::new,
- Collectors.mapping(Pair::getValue,
Collectors.reducing(combiner::apply))));
-
- return new HoodieListPairData<>(
- reducedMap.entrySet()
- .stream()
- .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))),
- lazy
- );
+ try (Stream<Pair<K, V>> stream = asStream()) {
+ Map<K, java.util.Optional<V>> reducedMap = stream.collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue,
Collectors.reducing(combiner::apply))));
+
+ return new HoodieListPairData<>(
+ reducedMap.entrySet()
+ .stream()
+ .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))),
+ lazy
+ );
+ }
}
@Override
@@ -158,7 +164,7 @@ public class HoodieListPairData<K, V> extends
HoodieBaseListData<Pair<K, V>> imp
new MappingIterator<>(mappedValuesIterator, w -> Pair.of(p.getKey(),
w));
return StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(mappedPairsIterator,
Spliterator.ORDERED), true);
+ Spliterators.spliteratorUnknownSize(mappedPairsIterator,
Spliterator.ORDERED), true).onClose(new IteratorCloser(mappedValuesIterator));
}), lazy);
}
@@ -172,26 +178,28 @@ public class HoodieListPairData<K, V> extends
HoodieBaseListData<Pair<K, V>> imp
ValidationUtils.checkArgument(other instanceof HoodieListPairData);
// Transform right-side container to a multi-map of [[K]] to [[List<W>]]
values
- HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>)
other).asStream().collect(
- Collectors.groupingBy(
- Pair::getKey,
- HashMap::new,
- Collectors.mapping(Pair::getValue, Collectors.toList())));
-
- Stream<Pair<K, Pair<V, Option<W>>>> leftOuterJoined =
asStream().flatMap(pair -> {
- K key = pair.getKey();
- V leftValue = pair.getValue();
- List<W> rightValues = rightStreamMap.get(key);
-
- if (rightValues == null) {
- return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty())));
- } else {
- return rightValues.stream().map(rightValue ->
- Pair.of(key, Pair.of(leftValue, Option.of(rightValue))));
- }
- });
-
- return new HoodieListPairData<>(leftOuterJoined, lazy);
+ try (Stream<Pair<K, W>> stream = ((HoodieListPairData<K, W>)
other).asStream()) {
+ HashMap<K, List<W>> rightStreamMap = stream.collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue, Collectors.toList())));
+
+ Stream<Pair<K, Pair<V, Option<W>>>> leftOuterJoined =
asStream().flatMap(pair -> {
+ K key = pair.getKey();
+ V leftValue = pair.getValue();
+ List<W> rightValues = rightStreamMap.get(key);
+
+ if (rightValues == null) {
+ return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty())));
+ } else {
+ return rightValues.stream().map(rightValue ->
+ Pair.of(key, Pair.of(leftValue, Option.of(rightValue))));
+ }
+ });
+
+ return new HoodieListPairData<>(leftOuterJoined, lazy);
+ }
}
@Override
@@ -206,24 +214,26 @@ public class HoodieListPairData<K, V> extends
HoodieBaseListData<Pair<K, V>> imp
ValidationUtils.checkArgument(other instanceof HoodieListPairData);
// Transform right-side container to a multi-map of [[K]] to [[List<W>]]
values
- HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>)
other).asStream().collect(
- Collectors.groupingBy(
- Pair::getKey,
- HashMap::new,
- Collectors.mapping(Pair::getValue, Collectors.toList())));
-
- List<Pair<K, Pair<V, W>>> joinResult = new ArrayList<>();
- asStream().forEach(pair -> {
- K key = pair.getKey();
- V leftValue = pair.getValue();
- List<W> rightValues = rightStreamMap.getOrDefault(key,
Collections.emptyList());
-
- for (W rightValue : rightValues) {
- joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue)));
- }
- });
-
- return new HoodieListPairData<>(joinResult, lazy);
+ try (Stream<Pair<K, W>> stream = ((HoodieListPairData<K, W>)
other).asStream()) {
+ HashMap<K, List<W>> rightStreamMap = stream.collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ HashMap::new,
+ Collectors.mapping(Pair::getValue, Collectors.toList())));
+
+ List<Pair<K, Pair<V, W>>> joinResult = new ArrayList<>();
+ asStream().forEach(pair -> {
+ K key = pair.getKey();
+ V leftValue = pair.getValue();
+ List<W> rightValues = rightStreamMap.getOrDefault(key,
Collections.emptyList());
+
+ for (W rightValue : rightValues) {
+ joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue)));
+ }
+ });
+
+ return new HoodieListPairData<>(joinResult, lazy);
+ }
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index a57fb777fbd..570d9580ca0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -348,12 +348,14 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
@Override
public void close() {
- try {
- reader.close();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to close the reader", e);
- } finally {
- this.reader = null;
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to close the reader", e);
+ } finally {
+ this.reader = null;
+ }
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
index 143d3ab0168..4dba4d840f0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
@@ -54,8 +54,9 @@ public abstract class HoodieAvroHFileReaderImplBase extends
HoodieAvroFileReader
public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader)
throws IOException {
Schema schema = reader.getSchema();
- return toStream(reader.getIndexedRecordIterator(schema))
- .collect(Collectors.toList());
+ try (ClosableIterator<IndexedRecord> indexedRecordIterator =
reader.getIndexedRecordIterator(schema)) {
+ return toStream(indexedRecordIterator).collect(Collectors.toList());
+ }
}
/**
@@ -77,8 +78,9 @@ public abstract class HoodieAvroHFileReaderImplBase extends
HoodieAvroFileReader
List<String> keys,
Schema schema) throws
IOException {
Collections.sort(keys);
- return toStream(reader.getIndexedRecordsByKeysIterator(keys, schema))
- .collect(Collectors.toList());
+ try (ClosableIterator<IndexedRecord> indexedRecordsByKeysIterator =
reader.getIndexedRecordsByKeysIterator(keys, schema)) {
+ return
toStream(indexedRecordsByKeysIterator).collect(Collectors.toList());
+ }
}
public abstract ClosableIterator<IndexedRecord>
getIndexedRecordsByKeysIterator(List<String> keys,
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java
new file mode 100644
index 00000000000..b61bdd6536f
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.util.Iterator;
+
+/**
+ * Implementation of a {@link ClosableIterator} to help validate that the
close method is properly called.
+ * @param <T> type of record within the iterator
+ */
+class CloseValidationIterator<T> implements ClosableIterator<T> {
+ private final Iterator<T> inner;
+ private boolean isClosed = false;
+
+ public CloseValidationIterator(Iterator<T> inner) {
+ this.inner = inner;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return inner.next();
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 795318f5e01..24bd9a90949 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@@ -94,4 +95,41 @@ class TestHoodieListData {
emptyListData = HoodieListData.lazy(Collections.emptyList());
assertTrue(emptyListData.isEmpty());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testMapPartitionsWithCloseable(boolean isLazy) {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ HoodieData<String> input = new HoodieListData<>(Stream.of(partition1,
partition2), isLazy);
+ CloseValidationIterator<String> iterator = new
CloseValidationIterator<>(Collections.singletonList("value").iterator());
+ assertEquals(1, input.mapPartitions(partition -> iterator,
true).collectAsList().size());
+ assertTrue(iterator.isClosed());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testFlatMapWithCloseable(boolean isLazy) {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ CloseValidationIterator<String> iterator1 = new
CloseValidationIterator<>(Collections.singletonList("value").iterator());
+ CloseValidationIterator<String> iterator2 = new
CloseValidationIterator<>(Collections.singletonList("value").iterator());
+ HoodieData<String> input = new HoodieListData<>(Stream.of(partition1,
partition2), isLazy);
+ assertEquals(2, input.flatMap(partition -> partition.equals(partition1) ?
iterator1 : iterator2).collectAsList().size());
+ assertTrue(iterator1.isClosed());
+ assertTrue(iterator2.isClosed());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testFlatMapToPairWithCloseable(boolean isLazy) {
+ String partition1 = "partition1";
+ String partition2 = "partition2";
+ HoodieData<String> input = new HoodieListData<>(Stream.of(partition1,
partition2), isLazy);
+ CloseValidationIterator<Pair<String, String>> iterator1 = new
CloseValidationIterator<>(Collections.singletonList(Pair.of("1",
"value")).iterator());
+ CloseValidationIterator<Pair<String, String>> iterator2 = new
CloseValidationIterator<>(Collections.singletonList(Pair.of("2",
"value")).iterator());
+ assertEquals(2, input.flatMapToPair(partition ->
partition.equals(partition1) ? iterator1 : iterator2).collectAsList().size());
+ assertTrue(iterator1.isClosed());
+ assertTrue(iterator2.isClosed());
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 8355a5f30ed..d0bda7715a6 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -42,6 +43,7 @@ import java.util.stream.StreamSupport;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests {@link HoodieListPairData}.
@@ -149,6 +151,50 @@ public class TestHoodieListDataPairData {
assertEquals(expected, toMap(reduced));
}
+ @Test
+ void testReduceByKeyWithCloseableInput() {
+ List<CloseValidationIterator<Pair<Integer, Integer>>> createdIterators =
new ArrayList<>();
+ HoodiePairData<Integer, Integer> data =
HoodieListData.lazy(Arrays.asList(1, 1, 1))
+ .flatMapToPair(key -> {
+ CloseValidationIterator<Pair<Integer, Integer>> iter = new
CloseValidationIterator<>(Collections.singletonList(Pair.of(key,
1)).iterator());
+ createdIterators.add(iter);
+ return iter;
+ });
+ List<Pair<Integer, Integer>> result = data.reduceByKey(Integer::sum,
1).collectAsList();
+ assertEquals(Collections.singletonList(Pair.of(1, 3)), result);
+ createdIterators.forEach(iter -> assertTrue(iter.isClosed()));
+ }
+
+ @Test
+ void testLeftOuterJoinWithCloseableInput() {
+ List<CloseValidationIterator<Pair<Integer, Integer>>> createdIterators =
new ArrayList<>();
+ HoodiePairData<Integer, Integer> dataToJoin =
HoodieListData.lazy(Arrays.asList(1, 2, 3))
+ .flatMapToPair(key -> {
+ CloseValidationIterator<Pair<Integer, Integer>> iter = new
CloseValidationIterator<>(Collections.singletonList(Pair.of(key,
1)).iterator());
+ createdIterators.add(iter);
+ return iter;
+ });
+ HoodiePairData<Integer, Integer> data =
HoodieListPairData.lazy(Arrays.asList(Pair.of(1, 1), Pair.of(4, 2)));
+ List<Pair<Integer, Pair<Integer, Option<Integer>>>> result =
data.leftOuterJoin(dataToJoin).collectAsList();
+ assertEquals(2, result.size());
+ createdIterators.forEach(iter -> assertTrue(iter.isClosed()));
+ }
+
+ @Test
+ void testJoinWithCloseableInput() {
+ List<CloseValidationIterator<Pair<Integer, Integer>>> createdIterators =
new ArrayList<>();
+ HoodiePairData<Integer, Integer> dataToJoin =
HoodieListData.lazy(Arrays.asList(1, 2, 3))
+ .flatMapToPair(key -> {
+ CloseValidationIterator<Pair<Integer, Integer>> iter = new
CloseValidationIterator<>(Collections.singletonList(Pair.of(key,
1)).iterator());
+ createdIterators.add(iter);
+ return iter;
+ });
+ HoodiePairData<Integer, Integer> data =
HoodieListPairData.lazy(Arrays.asList(Pair.of(1, 1), Pair.of(4, 2)));
+ List<Pair<Integer, Pair<Integer, Integer>>> result =
data.join(dataToJoin).collectAsList();
+ assertEquals(1, result.size());
+ createdIterators.forEach(iter -> assertTrue(iter.isClosed()));
+ }
+
@Test
public void testLeftOuterJoinSingleValuePerKey() {
HoodiePairData<String, String> pairData1 =
HoodieListPairData.lazy(Arrays.asList(
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index f34034d0a35..e0d307e2f03 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
@@ -42,7 +43,6 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
@@ -107,15 +107,16 @@ public class TestHoodieHBaseHFileReaderWriter extends
TestHoodieHFileReaderWrite
(entry.get("_row_key").toString()).contains("key05")
|| (entry.get("_row_key").toString()).contains("key24")
||
(entry.get("_row_key").toString()).contains("key31"))).collect(Collectors.toList());
- Iterator<IndexedRecord> iterator =
+ try (ClosableIterator<IndexedRecord> iterator =
hfileReader.getIndexedRecordsByKeysIterator(
Arrays.asList("key00001", "key05", "key24", "key16", "key31",
"key61"),
- avroSchema);
- List<GenericRecord> recordsByKeys =
- StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
- .map(r -> (GenericRecord) r)
- .collect(Collectors.toList());
- assertEquals(expectedKey1s, recordsByKeys);
+ avroSchema)) {
+ List<GenericRecord> recordsByKeys =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey1s, recordsByKeys);
+ }
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index 362f58a00cf..1044604419a 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
@@ -32,7 +33,6 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
@@ -78,15 +78,16 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieHFileReaderWriterBase
// Even though key16 exists, it's a backward seek not in order.
// Our native HFile reader does not allow backward seek, and throws an
exception
// Note that backward seek is not expected to happen in production code
- Iterator<IndexedRecord> iterator =
+ try (ClosableIterator<IndexedRecord> iterator =
hfileReader.getIndexedRecordsByKeysIterator(
Arrays.asList("key00001", "key05", "key24", "key16", "key31",
"key61"),
- avroSchema);
- assertThrows(
- IllegalStateException.class,
- () -> StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
- .collect(Collectors.toList()));
+ avroSchema)) {
+ assertThrows(
+ IllegalStateException.class,
+ () -> StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .collect(Collectors.toList()));
+ }
}
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
index 4cf6f7c27c7..698f44597ce 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
@@ -20,6 +20,7 @@
package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -39,7 +40,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
-import java.util.Iterator;
import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -66,33 +66,34 @@ public class TestOrcReaderIterator {
Schema avroSchema = getSchemaFromResource(TestOrcReaderIterator.class,
"/simple-test.avsc");
TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
OrcFile.WriterOptions options =
OrcFile.writerOptions(conf).setSchema(orcSchema).compress(CompressionKind.ZLIB);
- Writer writer = OrcFile.createWriter(filePath, options);
- VectorizedRowBatch batch = orcSchema.createRowBatch();
- BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0];
- LongColumnVector numberColumns = (LongColumnVector) batch.cols[1];
- BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2];
- for (int r = 0; r < 5; ++r) {
- int row = batch.size++;
- byte[] name = getUTF8Bytes("name" + r);
- nameColumns.setVal(row, name);
- byte[] color = getUTF8Bytes("color" + r);
- colorColumns.setVal(row, color);
- numberColumns.vector[row] = r;
+ try (Writer writer = OrcFile.createWriter(filePath, options)) {
+ VectorizedRowBatch batch = orcSchema.createRowBatch();
+ BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0];
+ LongColumnVector numberColumns = (LongColumnVector) batch.cols[1];
+ BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2];
+ for (int r = 0; r < 5; ++r) {
+ int row = batch.size++;
+ byte[] name = getUTF8Bytes("name" + r);
+ nameColumns.setVal(row, name);
+ byte[] color = getUTF8Bytes("color" + r);
+ colorColumns.setVal(row, color);
+ numberColumns.vector[row] = r;
+ }
+ writer.addRowBatch(batch);
}
- writer.addRowBatch(batch);
- writer.close();
Reader reader = OrcFile.createReader(filePath,
OrcFile.readerOptions(conf));
RecordReader recordReader = reader.rows(new
Reader.Options(conf).schema(orcSchema));
- Iterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader,
avroSchema, orcSchema);
- int recordCount = 0;
- while (iterator.hasNext()) {
- GenericRecord record = iterator.next();
- assertEquals("name" + recordCount, record.get("name").toString());
- assertEquals("color" + recordCount,
record.get("favorite_color").toString());
- assertEquals(recordCount, record.get("favorite_number"));
- recordCount++;
+ try (ClosableIterator<GenericRecord> iterator = new
OrcReaderIterator<>(recordReader, avroSchema, orcSchema)) {
+ int recordCount = 0;
+ while (iterator.hasNext()) {
+ GenericRecord record = iterator.next();
+ assertEquals("name" + recordCount, record.get("name").toString());
+ assertEquals("color" + recordCount,
record.get("favorite_color").toString());
+ assertEquals(recordCount, record.get("favorite_number"));
+ recordCount++;
+ }
+ assertEquals(5, recordCount);
}
- assertEquals(5, recordCount);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
index c1b41061992..e54faaac1da 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -24,6 +24,7 @@ import
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.data.CloseableIteratorListener
import org.apache.hudi.storage.StoragePath
import org.apache.hadoop.conf.Configuration
@@ -134,7 +135,7 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
(file: PartitionedFile) => {
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
val fileFormat = detectFileFormat(filePath.toString)
- file.partitionValues match {
+ val iter = file.partitionValues match {
case fileSliceMapping: HoodiePartitionFileSliceMapping =>
if (FSUtils.isLogFile(filePath)) {
// no base file
@@ -192,6 +193,7 @@ class HoodieMultipleBaseFileFormat(tableState:
Broadcast[HoodieTableState],
case _ => throw new UnsupportedOperationException(s"Base file format
$fileFormat is not supported.")
}
}
+ CloseableIteratorListener.addListener(iter)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 4eef9a5aaec..d6989d9075c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig,
TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.data.CloseableIteratorListener
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.io.IOUtils
import org.apache.hudi.storage.StorageConfiguration
@@ -163,7 +164,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
(file: PartitionedFile) => {
val storageConf = new
HadoopStorageConfiguration(broadcastedStorageConf.value.value)
- file.partitionValues match {
+ val iter = file.partitionValues match {
// Snapshot or incremental queries.
case fileSliceMapping: HoodiePartitionFileSliceMapping =>
val fileGroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
@@ -205,6 +206,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
readBaseFile(file, parquetFileReader.value, requestedSchema,
remainingPartitionSchema, fixedPartitionIndexes,
requiredSchema, partitionSchema, outputSchema, filters,
storageConf)
}
+ CloseableIteratorListener.addListener(iter)
}
}
@@ -270,6 +272,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
private def
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator:
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
mappingFunction:
Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
+ CloseableIteratorListener.addListener(closeableFileGroupRecordIterator)
new Iterator[InternalRow] with Closeable {
override def hasNext: Boolean = closeableFileGroupRecordIterator.hasNext
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 24c4663583e..a1b39d03584 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -31,6 +31,7 @@ import
org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.{Option, ValidationUtils}
import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig}
import org.apache.hudi.config.HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE
+import org.apache.hudi.data.CloseableIteratorListener
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator,
PartitionBucketIndexUtils}
import org.apache.hudi.internal.schema.InternalSchema
@@ -242,6 +243,7 @@ class PartitionBucketIndexManager extends BaseProcedure
false)
fileGroupReader.initRecordIterators()
val iterator =
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
+ CloseableIteratorListener.addListener(iterator)
iterator.asScala
})
}