This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d275748e871 [improve][io] Use try-with-resources for some locations
(#24900)
d275748e871 is described below
commit d275748e8713ebea2e7712a8657d82deac2d6132
Author: AROP <[email protected]>
AuthorDate: Wed Oct 29 00:00:40 2025 +0800
[improve][io] Use try-with-resources for some locations (#24900)
Co-authored-by: xcx <[email protected]>
---
.../org/apache/pulsar/io/file/utils/GZipFiles.java | 4 ++--
.../pulsar/io/flume/source/AbstractSource.java | 23 +++++++++++-----------
2 files changed, 13 insertions(+), 14 deletions(-)
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
index 858a499ee28..64c64d03bbf 100644
---
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
@@ -42,8 +42,8 @@ public class GZipFiles {
* Returns true if the given file is a gzip file.
*/
public static boolean isGzip(File f) {
- try (InputStream input = new FileInputStream(f)) {
- PushbackInputStream pb = new PushbackInputStream(input, 2);
+ try (InputStream input = new FileInputStream(f);
+ PushbackInputStream pb = new PushbackInputStream(input, 2)) {
byte[] signature = new byte[2];
int len = pb.read(signature); //read the signature
pb.unread(signature, 0, len); //push back the signature to the
stream
diff --git
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
index 5f3b1676aac..0b0a57cb0d4 100644
---
a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
+++
b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/source/AbstractSource.java
@@ -91,18 +91,17 @@ public abstract class AbstractSource<V> extends
PushSource<V> {
while (running) {
BlockingQueue<Map<String, Object>> blockingQueue =
SinkOfFlume.getQueue();
while (blockingQueue != null && !blockingQueue.isEmpty()) {
- ByteArrayOutputStream bos = new
ByteArrayOutputStream();
- ObjectOutput out = null;
- out = new ObjectOutputStream(bos);
- Map<String, Object> message = blockingQueue.take();
- out.writeObject(message.get("body"));
- out.flush();
- byte[] m = bos.toByteArray();
- String m1 = new String(m);
- bos.close();
- FlumeRecord flumeRecord = new FlumeRecord<>();
- flumeRecord.setRecord(extractValue(m1));
- consume(flumeRecord);
+ try (ByteArrayOutputStream bos = new
ByteArrayOutputStream();
+ ObjectOutput out = new ObjectOutputStream(bos)) {
+ Map<String, Object> message = blockingQueue.take();
+ out.writeObject(message.get("body"));
+ out.flush();
+ byte[] m = bos.toByteArray();
+ String m1 = new String(m);
+ FlumeRecord flumeRecord = new FlumeRecord<>();
+ flumeRecord.setRecord(extractValue(m1));
+ consume(flumeRecord);
+ }
}
}
} catch (Exception e) {