This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e0d3d4218a158ba8d76d582e014d7291a824b05c Author: ZhangJian He <[email protected]> AuthorDate: Sun Sep 26 09:23:38 2021 +0800 Remove the deprecated api usage in hdfs (#12080) ### Motivation Remove the deprecated api usage in hdfs ### Modifications Use try with resources instead of closeQuitely Remove the Long constructor * Remove the deprecated api usage in hdfs * Update pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java Co-authored-by: Yunze Xu <[email protected]> Co-authored-by: Yunze Xu <[email protected]> (cherry picked from commit f8220166a1b9af09a35d926adde90955be225af1) --- .../java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java | 7 +------ .../apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java | 2 +- .../java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java | 7 +------ .../apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java | 2 +- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java index f1aab8b..456d0e5 100644 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java +++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector { } InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); - Socket socket = null; - try { - socket = socketFactory.createSocket(); + try (Socket socket = socketFactory.createSocket()) { NetUtils.connect(socket, namenode, 1000); // 1 second timeout - } finally { - IOUtils.closeQuietly(socket); } } diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java index 88aee08..a3687b8 100644 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java +++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java @@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S @Override public KeyValue<Long, String> extractKeyValue(Record<String> record) { - Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet())); + Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet()); return new KeyValue<>(sequence, record.getValue()); } diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java index 28e0bd4..dfe8833 100644 --- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java +++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -138,12 +137,8 @@ public abstract class AbstractHdfsConnector { } InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); - Socket socket = null; - try { - socket = socketFactory.createSocket(); + try (Socket socket = socketFactory.createSocket()){ NetUtils.connect(socket, namenode, 1000); // 1 second timeout - } finally { - IOUtils.closeQuietly(socket); } } diff --git a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java index cca5398..d63f360 100644 --- a/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java +++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java @@ -60,7 +60,7 @@ public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, S @Override public KeyValue<Long, String> extractKeyValue(Record<String> record) { - Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet())); + Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet()); return new KeyValue<>(sequence, record.getValue()); }
