This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d22bdc59843 [MINOR] Avoid resource leaks (#10345)
d22bdc59843 is described below
commit d22bdc5984387c0664fa43854f19b3988f5149fc
Author: Tim Brown <[email protected]>
AuthorDate: Wed Jan 10 17:06:00 2024 -0800
[MINOR] Avoid resource leaks (#10345)
---
.../main/java/org/apache/hudi/metrics/Metrics.java | 35 +++++++++++++++-------
.../hudi/testutils/TestHoodieMetadataBase.java | 2 +-
.../hudi/common/table/log/HoodieLogFileReader.java | 1 +
.../common/table/log/HoodieLogFormatWriter.java | 1 +
.../common/util/collection/LazyFileIterable.java | 9 +++++-
.../hudi/internal/schema/utils/SerDeHelper.java | 6 ++--
.../io/storage/HoodieBootstrapRecordIterator.java | 3 +-
.../hudi/common/testutils/SchemaTestUtil.java | 5 ++--
.../hudi/hadoop/TestHoodieHFileInputFormat.java | 1 +
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 2 ++
.../realtime/TestHoodieRealtimeRecordReader.java | 3 ++
11 files changed, 49 insertions(+), 19 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index 47ee23bcc2f..31b0d19da01 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -50,6 +50,7 @@ public class Metrics {
private final List<MetricsReporter> reporters;
private final String commonMetricPrefix;
private boolean initialized = false;
+ private transient Thread shutdownThread = null;
public Metrics(HoodieWriteConfig metricConfig) {
registry = new MetricRegistry();
@@ -65,7 +66,8 @@ public class Metrics {
}
reporters.forEach(MetricsReporter::start);
- Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
+ shutdownThread = new Thread(() -> shutdown(true));
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
this.initialized = true;
}
@@ -112,16 +114,27 @@ public class Metrics {
return reporterList;
}
- public synchronized void shutdown() {
- try {
- registerHoodieCommonMetrics();
- reporters.forEach(MetricsReporter::report);
- LOG.info("Stopping the metrics reporter...");
- reporters.forEach(MetricsReporter::stop);
- } catch (Exception e) {
- LOG.warn("Error while closing reporter", e);
- } finally {
- initialized = false;
+ public void shutdown() {
+ shutdown(false);
+ }
+
+ private synchronized void shutdown(boolean fromShutdownHook) {
+ if (!fromShutdownHook) {
+ Runtime.getRuntime().removeShutdownHook(shutdownThread);
+ } else {
+ LOG.warn("Shutting down the metrics reporter from shutdown hook.");
+ }
+ if (initialized) {
+ try {
+ registerHoodieCommonMetrics();
+ reporters.forEach(MetricsReporter::report);
+ LOG.info("Stopping the metrics reporter...");
+ reporters.forEach(MetricsReporter::stop);
+ } catch (Exception e) {
+ LOG.warn("Error while closing reporter", e);
+ } finally {
+ initialized = false;
+ }
}
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
index 7f9e4712cff..336309deb23 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
@@ -296,7 +296,7 @@ public class TestHoodieMetadataBase extends
HoodieJavaClientTestHarness {
.withAutoClean(false).retainCommits(1).retainFileVersions(1)
.build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 *
1024 * 1024).build())
- .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
+ .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 6759650af78..764c919d2e1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -142,6 +142,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
private void addShutDownHook() {
shutdownThread = new Thread(() -> {
try {
+ LOG.warn("Failed to properly close HoodieLogFileReader in
application.");
close();
} catch (Exception e) {
LOG.warn("unable to close input stream for log file " + logFile, e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index f190b883c30..4e5082d4541 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -276,6 +276,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
shutdownThread = new Thread() {
public void run() {
try {
+ LOG.warn("running logformatwriter hook");
if (output != null) {
close();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
index 8e2210d61ee..799aa3d4d56 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/LazyFileIterable.java
@@ -21,6 +21,9 @@ package org.apache.hudi.common.util.collection;
import org.apache.hudi.common.util.BufferedRandomAccessFile;
import org.apache.hudi.exception.HoodieException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
@@ -32,6 +35,7 @@ import java.util.stream.Collectors;
* the latest value for a key spilled to disk and returns the result.
*/
public class LazyFileIterable<T, R> implements Iterable<R> {
+ private static final Logger LOG =
LoggerFactory.getLogger(LazyFileIterable.class);
// Used to access the value written at a specific position in the file
private final String filePath;
@@ -128,7 +132,10 @@ public class LazyFileIterable<T, R> implements Iterable<R>
{
}
private void addShutdownHook() {
- shutdownThread = new Thread(this::closeHandle);
+ shutdownThread = new Thread(() -> {
+ LOG.warn("Failed to properly close LazyFileIterable in application.");
+ this.closeHandle();
+ });
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
index f47d7f8da51..7891fc4582c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
@@ -18,6 +18,7 @@
package org.apache.hudi.internal.schema.utils;
+import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -28,7 +29,6 @@ import org.apache.hudi.internal.schema.Types;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringWriter;
@@ -295,7 +295,7 @@ public class SerDeHelper {
return Option.empty();
}
try {
- return Option.of(fromJson((new ObjectMapper(new
JsonFactory())).readValue(json, JsonNode.class)));
+ return Option.of(fromJson(JsonUtils.getObjectMapper().readTree(json)));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -311,7 +311,7 @@ public class SerDeHelper {
public static TreeMap<Long, InternalSchema> parseSchemas(String json) {
TreeMap<Long, InternalSchema> result = new TreeMap<>();
try {
- JsonNode jsonNode = (new ObjectMapper(new
JsonFactory())).readValue(json, JsonNode.class);
+ JsonNode jsonNode = JsonUtils.getObjectMapper().readTree(json);
if (!jsonNode.has(SCHEMAS)) {
throw new IllegalArgumentException(String.format("cannot parser
schemas from current json string, missing key name: %s", SCHEMAS));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
index 43f2d1ad1ad..6fa398a8225 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapRecordIterator.java
@@ -50,7 +50,8 @@ public abstract class HoodieBootstrapRecordIterator<T>
implements ClosableIterat
@Override
public void close() {
-
+ skeletonIterator.close();
+ dataFileIterator.close();
}
@Override
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index a96d53a273f..9ee16174973 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -37,6 +37,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.Utf8;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
@@ -271,8 +272,8 @@ public final class SchemaTestUtil {
}
public static Schema getSchemaFromResource(Class<?> clazz, String name,
boolean withHoodieMetadata) {
- try {
- Schema schema = new
Schema.Parser().parse(clazz.getResourceAsStream(name));
+ try (InputStream schemaInputStream = clazz.getResourceAsStream(name)) {
+ Schema schema = new Schema.Parser().parse(schemaInputStream);
return withHoodieMetadata ? HoodieAvroUtils.addMetadataFields(schema) :
schema;
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to get schema from
resource `%s` for class `%s`", name, clazz.getName()));
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
index a5260766bf1..ec6e3ee94dd 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHFileInputFormat.java
@@ -518,6 +518,7 @@ public class TestHoodieHFileInputFormat {
}
totalCount++;
}
+ recordReader.close();
}
assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
assertEquals(totalExpected, totalCount, msg);
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 0b0a615c263..d71055079c2 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -766,6 +766,7 @@ public class TestHoodieParquetInputFormat {
}
totalCount++;
}
+ recordReader.close();
}
assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
assertEquals(totalExpected, totalCount, msg);
@@ -821,6 +822,7 @@ public class TestHoodieParquetInputFormat {
// test date
assertEquals(LocalDate.ofEpochDay(testDate).toString(),
String.valueOf(writable.get()[2]));
}
+ recordReader.close();
}
}
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 15d983ee48b..350728b9d64 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -510,6 +510,7 @@ public class TestHoodieRealtimeRecordReader {
}
reader.close();
}
+ recordReader.close();
}
@ParameterizedTest
@@ -593,6 +594,7 @@ public class TestHoodieRealtimeRecordReader {
while (recordReader.next(key, value)) {
// keep reading
}
+ recordReader.close();
reader.close();
}
@@ -650,6 +652,7 @@ public class TestHoodieRealtimeRecordReader {
while (recordReader.next(key, value)) {
// keep reading
}
+ recordReader.close();
reader.close();
}