This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new ba82bf4c784 [fix][sec] Drop hdfs2 support, Upgrade hadoop3 to 3.4.0 
and dnsjava to 3.6.2 to address CVE-2024-25638 (#23411)
ba82bf4c784 is described below

commit ba82bf4c78467c0716c7464a7fb7b30dd37cfaea
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 11 19:22:35 2024 +0300

    [fix][sec] Drop hdfs2 support, Upgrade hadoop3 to 3.4.0 and dnsjava to 
3.6.2 to address CVE-2024-25638 (#23411)
---
 .github/workflows/pulsar-ci.yaml                   |   2 +-
 deployment/terraform-ansible/deploy-pulsar.yaml    |   1 -
 distribution/io/src/assemble/io.xml                |   1 -
 pom.xml                                            |  60 ++++-
 pulsar-bom/pom.xml                                 |   5 -
 pulsar-io/docs/pom.xml                             |   5 -
 pulsar-io/hdfs2/pom.xml                            | 130 -----------
 .../apache/pulsar/io/hdfs2/AbstractHdfsConfig.java |  76 -------
 .../pulsar/io/hdfs2/AbstractHdfsConnector.java     | 246 ---------------------
 .../org/apache/pulsar/io/hdfs2/Compression.java    |  26 ---
 .../org/apache/pulsar/io/hdfs2/HdfsResources.java  |  51 -----
 .../org/apache/pulsar/io/hdfs2/SecurityUtil.java   |  90 --------
 .../org/apache/pulsar/io/hdfs2/package-info.java   |  19 --
 .../pulsar/io/hdfs2/sink/HdfsAbstractSink.java     | 124 -----------
 .../pulsar/io/hdfs2/sink/HdfsSinkConfig.java       | 117 ----------
 .../pulsar/io/hdfs2/sink/HdfsSyncThread.java       |  79 -------
 .../apache/pulsar/io/hdfs2/sink/package-info.java  |  19 --
 .../sink/seq/HdfsAbstractSequenceFileSink.java     |  95 --------
 .../io/hdfs2/sink/seq/HdfsSequentialTextSink.java  |  70 ------
 .../pulsar/io/hdfs2/sink/seq/HdfsTextSink.java     |  53 -----
 .../pulsar/io/hdfs2/sink/seq/package-info.java     |  19 --
 .../hdfs2/sink/text/HdfsAbstractTextFileSink.java  |  78 -------
 .../pulsar/io/hdfs2/sink/text/HdfsStringSink.java  |  34 ---
 .../pulsar/io/hdfs2/sink/text/package-info.java    |  19 --
 .../resources/META-INF/services/pulsar-io.yaml     |  23 --
 .../hdfs2/src/main/resources/findbugsExclude.xml   |  58 -----
 .../pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java | 120 ----------
 .../pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java  | 158 -------------
 .../io/hdfs2/sink/seq/HdfsSequentialSinkTests.java | 110 ---------
 .../io/hdfs2/sink/seq/HdfsTextSinkTests.java       | 122 ----------
 .../io/hdfs2/sink/text/HdfsStringSinkTests.java    | 118 ----------
 .../hdfs2/src/test/resources/hadoop/core-site.xml  |  32 ---
 .../hdfs2/src/test/resources/hadoop/hdfs-site.xml  |  34 ---
 pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml |  26 ---
 pulsar-io/hdfs3/pom.xml                            |  50 ++---
 pulsar-io/pom.xml                                  |   2 -
 tiered-storage/file-system/pom.xml                 |  12 +
 37 files changed, 94 insertions(+), 2190 deletions(-)

diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index bf44c51b6ad..87d8cd7cf9a 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -1498,7 +1498,7 @@ jobs:
       - name: trigger dependency check
         run: |
           mvn -B -ntp verify -PskipDocker,skip-all,owasp-dependency-check 
-Dcheckstyle.skip=true -DskipTests \
-            -pl 
'!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs2,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb'
+            -pl 
'!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb'
 
       - name: Upload report
         uses: actions/upload-artifact@v4
diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml 
b/deployment/terraform-ansible/deploy-pulsar.yaml
index db2fd1257ca..3a9f0fd942c 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -147,7 +147,6 @@
 #        - file
 #        - flume
 #        - hbase
-#        - hdfs2
 #        - hdfs3
 #        - influxdb
 #        - jdbc-clickhouse
diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index f98ee14bb20..cf7731b4c85 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -63,7 +63,6 @@
     
<file><source>${basedir}/../../pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
-    
<file><source>${basedir}/../../pulsar-io/hdfs2/target/pulsar-io-hdfs2-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/hdfs3/target/pulsar-io-hdfs3-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/file/target/pulsar-io-file-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source></file>
diff --git a/pom.xml b/pom.xml
index f99eb3066d5..b89dd1597cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,6 @@ flexible messaging model and an intuitive client 
API.</description>
     <clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version>
     <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
     <openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
-    <hdfs-offload-version3>3.3.5</hdfs-offload-version3>
     <json-smart.version>2.4.10</json-smart.version>
     <opensearch.version>2.16.0</opensearch.version>
     <elasticsearch-java.version>8.12.1</elasticsearch-java.version>
@@ -207,9 +206,10 @@ flexible messaging model and an intuitive client 
API.</description>
     <wildfly-elytron.version>1.15.16.Final</wildfly-elytron.version>
     <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.28.0</opencensus.version>
-    <hadoop2.version>2.10.2</hadoop2.version>
-    <hadoop3.version>3.3.5</hadoop3.version>
-    <hbase.version>2.4.16</hbase.version>
+    <hadoop3.version>3.4.0</hadoop3.version>
+    <dnsjava3.version>3.6.2</dnsjava3.version>
+    <hdfs-offload-version3>${hadoop3.version}</hdfs-offload-version3>
+    <hbase.version>2.6.0-hadoop3</hbase.version>
     <guava.version>32.1.2-jre</guava.version>
     <jcip.version>1.0</jcip.version>
     <prometheus-jmx.version>0.16.1</prometheus-jmx.version>
@@ -1313,6 +1313,58 @@ flexible messaging model and an intuitive client 
API.</description>
         <version>${commons.collections4.version}</version>
       </dependency>
 
+      <!-- support only hadoop 3 since hadoop 2 is EOL -->
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+        <version>${hadoop3.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>dnsjava</groupId>
+            <artifactId>dnsjava</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-auth</artifactId>
+        <version>${hadoop3.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>dnsjava</groupId>
+            <artifactId>dnsjava</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client</artifactId>
+        <version>${hadoop3.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>dnsjava</groupId>
+            <artifactId>dnsjava</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-client</artifactId>
+        <version>${hbase.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>dnsjava</groupId>
+            <artifactId>dnsjava</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <!-- dnsjava is pulled in by hadoop-common -->
+      <dependency>
+        <groupId>dnsjava</groupId>
+        <artifactId>dnsjava</artifactId>
+        <version>${dnsjava3.version}</version>
+      </dependency>
+
       <!-- test dependencies -->
       <dependency>
         <groupId>com.lmax</groupId>
diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml
index d195411fa64..e674301f18a 100644
--- a/pulsar-bom/pom.xml
+++ b/pulsar-bom/pom.xml
@@ -495,11 +495,6 @@
         <artifactId>pulsar-io-hbase</artifactId>
         <version>${project.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.pulsar</groupId>
-        <artifactId>pulsar-io-hdfs2</artifactId>
-        <version>${project.version}</version>
-      </dependency>
       <dependency>
         <groupId>org.apache.pulsar</groupId>
         <artifactId>pulsar-io-hdfs3</artifactId>
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index ac4ae9496d1..e373db26c45 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -127,11 +127,6 @@
       <artifactId>pulsar-io-hbase</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-hdfs2</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-hdfs3</artifactId>
diff --git a/pulsar-io/hdfs2/pom.xml b/pulsar-io/hdfs2/pom.xml
deleted file mode 100644
index d5fb33c170d..00000000000
--- a/pulsar-io/hdfs2/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar-io</artifactId>
-    <version>4.0.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>pulsar-io-hdfs2</artifactId>
-  <name>Pulsar IO :: Hdfs2</name>
-
-    <dependencies>
-     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-io-core</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.dataformat</groupId>
-      <artifactId>jackson-dataformat-yaml</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-collections4</artifactId>
-    </dependency>
-
-       <dependency>
-               <groupId>org.apache.hadoop</groupId>
-               <artifactId>hadoop-client</artifactId>
-               <version>${hadoop2.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>*</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-          </exclusion>
-        </exclusions>
-       </dependency>
-    <dependency>
-       <groupId>org.apache.commons</groupId>
-          <artifactId>commons-lang3</artifactId>
-    </dependency>
- </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-nar-maven-plugin</artifactId>
-      </plugin>
-      <plugin>
-        <groupId>com.github.spotbugs</groupId>
-        <artifactId>spotbugs-maven-plugin</artifactId>
-        <version>${spotbugs-maven-plugin.version}</version>
-        <configuration>
-            
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
-        </configuration>
-        <executions>
-            <execution>
-                <id>spotbugs</id>
-                <phase>verify</phase>
-                <goals>
-                    <goal>check</goal>
-                </goals>
-            </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-    <profiles>
-        <!--
-        The only working way for OWASP dependency checker plugin
-        to exclude module when failBuildOnCVSS is used
-        in the root pom's plugin.
-        -->
-        <profile>
-            <id>owasp-dependency-check</id>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.owasp</groupId>
-                        <artifactId>dependency-check-maven</artifactId>
-                        <executions>
-                            <execution>
-                                <goals>
-                                    <goal>aggregate</goal>
-                                </goals>
-                                <phase>none</phase>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
-
-</project>
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
deleted file mode 100644
index 757360e0453..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import java.io.Serializable;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.commons.lang.StringUtils;
-
-/**
- * Configuration object for all HDFS components.
- */
-@Data
-@Accessors(chain = true)
-public abstract class AbstractHdfsConfig implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * A file or comma separated list of files which contains the Hadoop file 
system configuration,
-     * e.g. 'core-site.xml', 'hdfs-site.xml'.
-     */
-    private String hdfsConfigResources;
-
-    /**
-     * The HDFS directory from which files should be read from or written to.
-     */
-    private String directory;
-
-    /**
-     * The character encoding for the files, e.g. UTF-8, ASCII, etc.
-     */
-    private String encoding;
-
-    /**
-     * The compression codec used to compress/de-compress the files on HDFS.
-     */
-    private Compression compression;
-
-    /**
-     * The Kerberos user principal account to use for authentication.
-     */
-    private String kerberosUserPrincipal;
-
-    /**
-     * The full pathname to the Kerberos keytab file to use for authentication.
-     */
-    private String keytab;
-
-    public void validate() {
-        if (StringUtils.isEmpty(hdfsConfigResources) || 
StringUtils.isEmpty(directory)) {
-           throw new IllegalArgumentException("Required property not set.");
-        }
-
-        if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && 
StringUtils.isEmpty(keytab))
-            || (StringUtils.isEmpty(kerberosUserPrincipal) && 
StringUtils.isNotEmpty(keytab))) {
-          throw new IllegalArgumentException("Values for both 
kerberosUserPrincipal & keytab are required.");
-        }
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
deleted file mode 100644
index d7277aa6273..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collections;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.net.SocketFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;
-
-/**
- * A Simple abstract class for HDFS connectors.
- * Provides methods for connecting to HDFS
- */
-public abstract class AbstractHdfsConnector {
-
-    private static final Object RESOURCES_LOCK = new Object();
-
-    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
-    protected final AtomicReference<HdfsResources> hdfsResources = new 
AtomicReference<>();
-    protected AbstractHdfsConfig connectorConfig;
-    protected CompressionCodecFactory compressionCodecFactory;
-
-    public AbstractHdfsConnector() {
-       hdfsResources.set(new HdfsResources(null, null, null));
-    }
-
-    /*
-     * Reset Hadoop Configuration and FileSystem based on the supplied 
configuration resources.
-     */
-    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) 
throws IOException {
-        Configuration config = new ExtendedConfiguration();
-        config.setClassLoader(Thread.currentThread().getContextClassLoader());
-
-        getConfig(config, connectorConfig.getHdfsConfigResources());
-
-        // first check for timeout on HDFS connection, because FileSystem has 
a hard coded 15 minute timeout
-        checkHdfsUriForTimeout(config);
-
-        /* Disable caching of Configuration and FileSystem objects, else we 
cannot reconfigure
-         * the processor without a complete restart
-         */
-        String disableCacheName = String.format("fs.%s.impl.disable.cache",
-                FileSystem.getDefaultUri(config).getScheme());
-        config.set(disableCacheName, "true");
-
-        // If kerberos is enabled, create the file system as the kerberos 
principal
-        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed 
by only a single thread at at time
-        FileSystem fs;
-        UserGroupInformation ugi;
-        synchronized (RESOURCES_LOCK) {
-            if (SecurityUtil.isSecurityEnabled(config)) {
-                ugi = SecurityUtil.loginKerberos(config,
-                        connectorConfig.getKerberosUserPrincipal(), 
connectorConfig.getKeytab());
-                fs = getFileSystemAsUser(config, ugi);
-            } else {
-                config.set("ipc.client.fallback-to-simple-auth-allowed", 
"true");
-                config.set("hadoop.security.authentication", "simple");
-                ugi = SecurityUtil.loginSimple(config);
-                fs = getFileSystemAsUser(config, ugi);
-            }
-        }
-        return new HdfsResources(config, fs, ugi);
-    }
-
-    private static Configuration getConfig(final Configuration config, String 
res) throws IOException {
-        boolean foundResources = false;
-        if (null != res) {
-            String[] resources = res.split(",");
-            for (String resource : resources) {
-                config.addResource(new Path(resource.trim()));
-                foundResources = true;
-            }
-        }
-
-        if (!foundResources) {
-            // check that at least 1 non-default resource is available on the 
classpath
-            String configStr = config.toString();
-            for (String resource : configStr.substring(configStr.indexOf(":") 
+ 1).split(",")) {
-                if (!resource.contains("default") && 
config.getResource(resource.trim()) != null) {
-                    foundResources = true;
-                    break;
-                }
-            }
-        }
-
-        if (!foundResources) {
-            throw new IOException("Could not find any of the " + res + " on 
the classpath");
-        }
-        return config;
-    }
-
-    /*
-     * Reduce the timeout of a socket connection from the default in 
FileSystem.get()
-     */
-    protected void checkHdfsUriForTimeout(Configuration config) throws 
IOException {
-        URI hdfsUri = FileSystem.getDefaultUri(config);
-        String address = hdfsUri.getAuthority();
-        int port = hdfsUri.getPort();
-        if (address == null || address.isEmpty() || port < 0) {
-            return;
-        }
-        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
-        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
-        try (Socket socket = socketFactory.createSocket()) {
-            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
-        }
-    }
-
-    /**
-     * This exists in order to allow unit tests to override it so that they 
don't take several
-     * minutes waiting for UDP packets to be received.
-     *
-     * @param config
-     *            the configuration to use
-     * @return the FileSystem that is created for the given Configuration
-     * @throws IOException
-     *             if unable to create the FileSystem
-     */
-    protected FileSystem getFileSystem(final Configuration config) throws 
IOException {
-        return FileSystem.get(config);
-    }
-
-    protected FileSystem getFileSystemAsUser(final Configuration config, 
UserGroupInformation ugi) throws IOException {
-        try {
-            return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> 
FileSystem.get(config));
-        } catch (InterruptedException e) {
-            throw new IOException("Unable to create file system: " + 
e.getMessage());
-        }
-    }
-
-    protected Configuration getConfiguration() {
-        return hdfsResources.get().getConfiguration();
-    }
-
-    protected FileSystem getFileSystem() {
-        return hdfsResources.get().getFileSystem();
-    }
-
-    protected UserGroupInformation getUserGroupInformation() {
-        return hdfsResources.get().getUserGroupInformation();
-    }
-
-    protected String getEncoding() {
-        return StringUtils.isNotBlank(connectorConfig.getEncoding())
-                   ? connectorConfig.getEncoding() : 
Charset.defaultCharset().name();
-    }
-
-    protected CompressionCodec getCompressionCodec() {
-       if (connectorConfig.getCompression() == null) {
-           return null;
-       }
-
-       CompressionCodec codec = getCompressionCodecFactory()
-               .getCodecByName(connectorConfig.getCompression().name());
-
-       return (codec != null) ? codec : new DefaultCodec();
-    }
-
-    protected CompressionCodecFactory getCompressionCodecFactory() {
-        if (compressionCodecFactory == null) {
-            compressionCodecFactory = new 
CompressionCodecFactory(getConfiguration());
-        }
-
-        return compressionCodecFactory;
-    }
-
-    /**
-     * Extending Hadoop Configuration to prevent it from caching classes that 
can't be found. Since users may be
-     * adding additional JARs to the classpath we don't want them to have to 
restart the JVM to be able to load
-     * something that was previously not found, but might now be available.
-     * Reference the original getClassByNameOrNull from Configuration.
-     */
-    static class ExtendedConfiguration extends Configuration {
-
-        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> 
cacheClasses = new WeakHashMap<>();
-
-        @Override
-        public Class<?> getClassByNameOrNull(String name) {
-            final ClassLoader classLoader = getClassLoader();
-
-            Map<String, WeakReference<Class<?>>> map;
-            synchronized (cacheClasses) {
-                map = cacheClasses.get(classLoader);
-                if (map == null) {
-                    map = Collections.synchronizedMap(new WeakHashMap<>());
-                    cacheClasses.put(classLoader, map);
-                }
-            }
-
-            Class<?> clazz = null;
-            WeakReference<Class<?>> ref = map.get(name);
-            if (ref != null) {
-                clazz = ref.get();
-            }
-
-            if (clazz == null) {
-                try {
-                    clazz = Class.forName(name, true, classLoader);
-                } catch (ClassNotFoundException | NoClassDefFoundError e) {
-                    return null;
-                }
-                // two putters can race here, but they'll put the same class
-                map.put(name, new WeakReference<>(clazz));
-                return clazz;
-            } else {
-                // cache hit
-                return clazz;
-            }
-        }
-
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
deleted file mode 100644
index 1e3d2f94904..00000000000
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-/**
- * An enumeration of compression codecs available for HDFS.
- */
-public enum Compression {
-    BZIP2, DEFLATE, GZIP, LZ4, SNAPPY, ZSTANDARD
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
deleted file mode 100644
index 5fd6b283e6b..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * A wrapper class for HDFS resources.
- */
-public class HdfsResources {
-
-    private final Configuration configuration;
-    private final FileSystem fileSystem;
-    private final UserGroupInformation userGroupInformation;
-
-    public HdfsResources(Configuration config, FileSystem fs, 
UserGroupInformation ugi) {
-        this.configuration = config;
-        this.fileSystem = fs;
-        this.userGroupInformation = ugi;
-    }
-
-    public Configuration getConfiguration() {
-        return configuration;
-    }
-
-    public FileSystem getFileSystem() {
-        return fileSystem;
-    }
-
-    public UserGroupInformation getUserGroupInformation() {
-        return userGroupInformation;
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
deleted file mode 100644
index ca178aad911..00000000000
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import java.io.IOException;
-import org.apache.commons.lang3.Validate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Provides synchronized access to UserGroupInformation to avoid multiple 
processors/services from
- * interfering with each other.
- */
-public class SecurityUtil {
-    public static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
-    public static final String KERBEROS = "kerberos";
-
-    /**
-     *  Initializes UserGroupInformation with the given Configuration and 
performs the login for the
-     *  given principal and keytab. All logins should happen through this 
class to ensure other threads
-     *  are not concurrently modifying UserGroupInformation.
-     * <p/>
-     * @param config the configuration instance
-     * @param principal the principal to authenticate as
-     * @param keyTab the keytab to authenticate with
-     *
-     * @return the UGI for the given principal
-     *
-     * @throws IOException if login failed
-     */
-    public static synchronized UserGroupInformation loginKerberos(final 
Configuration config,
-            final String principal, final String keyTab) throws IOException {
-        Validate.notNull(config);
-        Validate.notNull(principal);
-        Validate.notNull(keyTab);
-
-        UserGroupInformation.setConfiguration(config);
-        UserGroupInformation.loginUserFromKeytab(principal.trim(), 
keyTab.trim());
-        return UserGroupInformation.getCurrentUser();
-    }
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and
-     * returns UserGroupInformation.getLoginUser(). All logins should happen
-     * through this class to ensure other threads are not concurrently
-     * modifying UserGroupInformation.
-     *
-     * @param config the configuration instance
-     *
-     * @return the UGI for the given principal
-     *
-     * @throws IOException if login failed
-     */
-    public static synchronized UserGroupInformation loginSimple(final 
Configuration config) throws IOException {
-        Validate.notNull(config);
-        UserGroupInformation.setConfiguration(config);
-        return UserGroupInformation.getLoginUser();
-    }
-
-    /**
-     * Initializes UserGroupInformation with the given Configuration and 
returns
-     * UserGroupInformation.isSecurityEnabled().
-     * All checks for isSecurityEnabled() should happen through this method.
-     *
-     * @param config the given configuration
-     *
-     * @return true if kerberos is enabled on the given configuration, false 
otherwise
-     *
-     */
-    public static boolean isSecurityEnabled(final Configuration config) {
-        Validate.notNull(config);
-        return 
KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
deleted file mode 100644
index 464c6db341e..00000000000
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
deleted file mode 100644
index 7b025d16378..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector;
-import org.apache.pulsar.io.hdfs2.HdfsResources;
-
-/**
- * A Simple abstract class for HDFS sink.
- * Users need to implement extractKeyValue function to use this sink.
- */
-@Slf4j
-public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector 
implements Sink<V> {
-
-    protected HdfsSinkConfig hdfsSinkConfig;
-    protected BlockingQueue<Record<V>> unackedRecords;
-    protected HdfsSyncThread<V> syncThread;
-    private Path path;
-    private FSDataOutputStream hdfsStream;
-    private DateTimeFormatter subdirectoryFormatter;
-
-    public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
-    protected abstract void createWriter() throws IOException;
-
-    @Override
-    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-       hdfsSinkConfig = HdfsSinkConfig.load(config);
-       hdfsSinkConfig.validate();
-       connectorConfig = hdfsSinkConfig;
-       unackedRecords = new LinkedBlockingQueue<Record<V>> 
(hdfsSinkConfig.getMaxPendingRecords());
-       if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
-           subdirectoryFormatter = 
DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
-       }
-       connectToHdfs();
-       createWriter();
-       launchSyncThread();
-    }
-
-    @Override
-    public void close() throws Exception {
-       syncThread.halt();
-       syncThread.join(0);
-    }
-
-    protected final void connectToHdfs() throws IOException {
-       try {
-           HdfsResources resources = hdfsResources.get();
-
-           if (resources.getConfiguration() == null) {
-               resources = this.resetHDFSResources(hdfsSinkConfig);
-               hdfsResources.set(resources);
-           }
-       } catch (IOException ex) {
-          hdfsResources.set(new HdfsResources(null, null, null));
-          throw ex;
-       }
-    }
-
-    protected FSDataOutputStream getHdfsStream() throws 
IllegalArgumentException, IOException {
-        if (hdfsStream == null) {
-            Path path = getPath();
-            FileSystem fs = getFileSystemAsUser(getConfiguration(), 
getUserGroupInformation());
-            hdfsStream = fs.exists(path) ? fs.append(path) : fs.create(path);
-        }
-        return hdfsStream;
-    }
-
-    protected final Path getPath() {
-        if (path == null) {
-            String ext = "";
-            if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) {
-                ext = hdfsSinkConfig.getFileExtension();
-            } else if (getCompressionCodec() != null) {
-                ext = getCompressionCodec().getDefaultExtension();
-            }
-
-            String directory = hdfsSinkConfig.getDirectory();
-            if (subdirectoryFormatter != null) {
-                directory = FilenameUtils.concat(directory, 
LocalDateTime.now().format(subdirectoryFormatter));
-            }
-            path = new Path(FilenameUtils.concat(directory,
-                    hdfsSinkConfig.getFilenamePrefix() + "-" + 
System.currentTimeMillis() + ext));
-            log.info("Create path: {}", path);
-        }
-        return path;
-    }
-
-    protected final void launchSyncThread() throws IOException {
-        syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, 
hdfsSinkConfig.getSyncInterval());
-        syncThread.start();
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
deleted file mode 100644
index 9e1c6090fb5..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.experimental.Accessors;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.io.hdfs2.AbstractHdfsConfig;
-
-/**
- * Configuration object for all HDFS Sink components.
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-@Accessors(chain = true)
-public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable 
{
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * The prefix of the files to create inside the HDFS directory, i.e. a 
value of "topicA"
-     * will result in files named topicA-, topicA-, etc being produced
-     */
-    private String filenamePrefix;
-
-    /**
-     * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', 
etc.
-     */
-    private String fileExtension;
-
-    /**
-     * The character to use to separate records in a text file. If no value is 
provided
-     * then the content from all of the records will be concatenated together 
in one continuous
-     * byte array.
-     */
-    private char separator;
-
-    /**
-     * The interval (in milliseconds) between calls to flush data to HDFS disk.
-     */
-    private long syncInterval;
-
-    /**
-     * The maximum number of records that we hold in memory before acking. 
Default is Integer.MAX_VALUE.
-     * Setting this value to one, results in every record being sent to disk 
before the record is acked,
-     * while setting it to a higher values allows us to buffer records before 
flushing them all to disk.
-     */
-    private int maxPendingRecords = Integer.MAX_VALUE;
-
-    /**
-     * A subdirectory associated with the created time of the sink.
-     * The pattern is the formatted pattern of {@link 
AbstractHdfsConfig#getDirectory()}'s subdirectory.
-     *
-     * @see java.time.format.DateTimeFormatter for pattern's syntax
-     */
-    private String subdirectoryPattern;
-
-    public static HdfsSinkConfig load(String yamlFile) throws IOException {
-       ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-       return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
-    }
-
-    public static HdfsSinkConfig load(Map<String, Object> map) throws 
IOException {
-       ObjectMapper mapper = new ObjectMapper();
-       return mapper.readValue(mapper.writeValueAsString(map), 
HdfsSinkConfig.class);
-    }
-
-    @Override
-    public void validate() {
-        super.validate();
-        if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
-                || StringUtils.isEmpty(filenamePrefix)) {
-            throw new IllegalArgumentException("Required property not set.");
-        }
-
-        if (syncInterval < 0) {
-            throw new IllegalArgumentException("Sync Interval cannot be 
negative");
-        }
-
-        if (maxPendingRecords < 1) {
-            throw new IllegalArgumentException("Max Pending Records must be a 
positive integer");
-        }
-
-        if (subdirectoryPattern != null) {
-            try {
-                LocalDateTime.of(2020, 1, 1, 12, 
0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
-            } catch (Exception e) {
-                throw new IllegalArgumentException(subdirectoryPattern + " is 
not a valid pattern: " + e.getMessage());
-            }
-        }
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
deleted file mode 100644
index 9ddd83f4423..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.pulsar.functions.api.Record;
-
-/**
- * A thread that runs in the background and acknowledges Records
- * after they have been written to disk.
- *
- * @param <V>
- */
-public class HdfsSyncThread<V> extends Thread {
-
-    private final Syncable stream;
-    private final BlockingQueue<Record<V>> unackedRecords;
-    private final long syncInterval;
-    private boolean keepRunning = true;
-
-    public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>> 
unackedRecords, long syncInterval) {
-      this.stream = stream;
-      this.unackedRecords = unackedRecords;
-      this.syncInterval = syncInterval;
-    }
-
-    @Override
-    public void run() {
-       while (keepRunning) {
-         try {
-            Thread.sleep(syncInterval);
-            ackRecords();
-         } catch (InterruptedException e) {
-            return;
-         } catch (IOException e) {
-            e.printStackTrace();
-         }
-       }
-    }
-
-    public final void halt() throws IOException, InterruptedException {
-       keepRunning = false;
-       ackRecords();
-    }
-
-    private void ackRecords() throws IOException, InterruptedException {
-
-        if (CollectionUtils.isEmpty(unackedRecords)) {
-           return;
-        }
-
-        synchronized (stream) {
-          stream.hsync();
-        }
-
-        while (!unackedRecords.isEmpty()) {
-          unackedRecords.take().ack();
-        }
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
deleted file mode 100644
index 238a441ee0e..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
deleted file mode 100644
index 355c00080ef..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Writer.Option;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * HDFS Sink that writes it contents to HDFS as Sequence Files.
- *
- * @param <K> - The incoming Key type
- * @param <V> - The incoming Value type
- * @param <HdfsK> - The HDFS Key type
- * @param <HdfsV> - The HDFS Value type
- */
-public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
-    extends HdfsAbstractSink<K, V> implements Sink<V> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);
-
-    protected AtomicLong counter;
-    protected FSDataOutputStream hdfsStream;
-    protected Writer writer = null;
-
-    public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);
-
-    @Override
-    public void close() throws Exception {
-       writer.close();
-       super.close();
-    }
-
-    @Override
-    protected void createWriter() throws IOException {
-       writer = getWriter();
-    }
-
-    @Override
-    public void write(Record<V> record) {
-       try {
-            KeyValue<K, V> kv = extractKeyValue(record);
-            KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
-            writer.append(keyValue.getKey(), keyValue.getValue());
-            unackedRecords.put(record);
-        } catch (IOException | InterruptedException e) {
-            LOG.error("Unable to write to file " + getPath(), e);
-            record.fail();
-        }
-    }
-
-    protected Writer getWriter() throws IOException {
-        counter = new AtomicLong(0);
-        List<Option> options = getOptions();
-        return SequenceFile.createWriter(getConfiguration(),
-                options.toArray(new Option[options.size()]));
-     }
-
-    protected List<Option> getOptions() throws IllegalArgumentException, 
IOException {
-        List<Option> list = new ArrayList<Option>();
-        list.add(Writer.stream(getHdfsStream()));
-
-        if (getCompressionCodec() != null) {
-            list.add(Writer.compression(SequenceFile.CompressionType.RECORD, 
getCompressionCodec()));
-        }
-        return list;
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
deleted file mode 100644
index 8fa908c3c8c..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Writer.Option;
-import org.apache.hadoop.io.Text;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * This Sink should be used when the records are originating from a sequential 
source,
- * and we want to retain the record sequence.This class uses the record's 
sequence id as
- * the sequence id in the HDFS Sequence File if it is available, if not a 
sequence id is
- * auto-generated for each new record.
- */
-public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, 
String, LongWritable, Text> {
-
-    private AtomicLong counter;
-
-    @Override
-    public Writer getWriter() throws IOException {
-       counter = new AtomicLong(0);
-
-       return SequenceFile
-                .createWriter(
-                   getConfiguration(),
-                   getOptions().toArray(new Option[getOptions().size()]));
-    }
-
-    @Override
-    protected List<Option> getOptions() throws IllegalArgumentException, 
IOException {
-        List<Option> opts = super.getOptions();
-        opts.add(Writer.keyClass(LongWritable.class));
-        opts.add(Writer.valueClass(Text.class));
-        return opts;
-    }
-
-    @Override
-    public KeyValue<Long, String> extractKeyValue(Record<String> record) {
-       Long sequence = record.getRecordSequence().orElseGet(() -> 
counter.incrementAndGet());
-       return new KeyValue<>(sequence, record.getValue());
-    }
-
-    @Override
-    public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
-       return new KeyValue<>(new LongWritable(kv.getKey()), new 
Text(kv.getValue()));
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
deleted file mode 100644
index 3c643e287a9..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Writer.Option;
-import org.apache.hadoop.io.Text;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * A Simple Sink class for Hdfs Sequence File.
- */
-public class HdfsTextSink extends
-     HdfsAbstractSequenceFileSink<String, String, Text, Text> {
-
-    @Override
-    protected List<Option> getOptions() throws IllegalArgumentException, 
IOException {
-        List<Option> opts = super.getOptions();
-        opts.add(Writer.keyClass(Text.class));
-        opts.add(Writer.valueClass(Text.class));
-        return opts;
-    }
-
-    @Override
-    public KeyValue<String, String> extractKeyValue(Record<String> record) {
-       String key = record.getKey().orElseGet(() -> record.getValue());
-       return new KeyValue<>(key, record.getValue());
-    }
-
-    @Override
-    public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
-       return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
deleted file mode 100644
index 0bcb0d837fc..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
deleted file mode 100644
index d96309695f1..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for HDFS Sinks that writes there contents to HDFS as Text Files.
- *
- * @param <K>
- * @param <V>
- */
-public abstract class HdfsAbstractTextFileSink<K, V> extends 
HdfsAbstractSink<K, V> implements Sink<V> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);
-
-    protected OutputStreamWriter writer;
-
-    @Override
-    protected void createWriter() throws IOException {
-        writer = new OutputStreamWriter(new 
BufferedOutputStream(openHdfsStream()), getEncoding());
-    }
-
-    @Override
-    public void close() throws Exception {
-        writer.close();
-        super.close();
-    }
-
-    @Override
-    public void write(Record<V> record) {
-       try {
-           KeyValue<K, V> kv = extractKeyValue(record);
-           writer.write(kv.getValue().toString());
-
-           if (hdfsSinkConfig.getSeparator() != '\u0000') {
-              writer.write(hdfsSinkConfig.getSeparator());
-           }
-           unackedRecords.put(record);
-        } catch (IOException | InterruptedException e) {
-            LOG.error("Unable to write to file " + getPath(), e);
-            record.fail();
-        }
-    }
-
-    private OutputStream openHdfsStream() throws IOException {
-       if (hdfsSinkConfig.getCompression() != null) {
-          return getCompressionCodec().createOutputStream(getHdfsStream());
-       } else {
-          return getHdfsStream();
-       }
-    }
-}
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
deleted file mode 100644
index 4de5de1aedf..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-
-/**
- * A Simple Sink class for Hdfs Text File.
- */
-public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String> 
implements Sink<String> {
-    @Override
-    public KeyValue<String, String> extractKeyValue(Record<String> record) {
-       String key = record.getKey().orElseGet(() -> record.getValue());
-       return new KeyValue<>(key, record.getValue());
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
 
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
deleted file mode 100644
index a5618597197..00000000000
--- 
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
\ No newline at end of file
diff --git 
a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
deleted file mode 100644
index 8bc33954c29..00000000000
--- a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-name: hdfs2
-description: Writes data into HDFS 2.x
-sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink
-sinkConfigClass: org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig
diff --git a/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml 
b/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index 980349c4d8d..00000000000
--- a/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<FindBugsFilter>
-  <!-- Ignore violations that were present when the rule was enabled -->
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
-    <Method name="getConfiguration"/>
-    <Bug pattern="EI_EXPOSE_REP"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
-    <Method name="getFileSystem"/>
-    <Bug pattern="EI_EXPOSE_REP"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
-    <Method name="getUserGroupInformation"/>
-    <Bug pattern="EI_EXPOSE_REP"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
-    <Method name="&lt;init&gt;"/>
-    <Bug pattern="EI_EXPOSE_REP2"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
-    <Method name="&lt;init&gt;"/>
-    <Bug pattern="EI_EXPOSE_REP2"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
-    <Method name="&lt;init&gt;"/>
-    <Bug pattern="EI_EXPOSE_REP2"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.pulsar.io.hdfs2.sink.HdfsSyncThread"/>
-    <Method name="&lt;init&gt;"/>
-    <Bug pattern="EI_EXPOSE_REP2"/>
-  </Match>
-</FindBugsFilter>
diff --git 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
deleted file mode 100644
index 2660199dc18..00000000000
--- 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.annotations.BeforeMethod;
-
-/**
- * Simple base class for all the HDFS sink test cases.
- * Provides utility methods for sending records to the sink.
- *
- */
-public abstract class AbstractHdfsSinkTest<K, V> {
-    
-    @Mock
-    protected SinkContext mockSinkContext;
-    
-    @Mock
-    protected Record<V> mockRecord;
-    
-    protected Map<String, Object> map;
-    protected HdfsAbstractSink<K, V> sink;
-    
-    @SuppressWarnings("unchecked")
-    @BeforeMethod(alwaysRun = true)
-    public final void setUp() throws Exception {
-        map = new HashMap<String, Object> ();
-        map.put("hdfsConfigResources", 
"../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml,"
-                + 
"../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml");
-        map.put("directory", "/tmp/testing");
-        map.put("filenamePrefix", "prefix");
-        
-        mockSinkContext = mock(SinkContext.class);
-        
-        mockRecord = mock(Record.class);
-        when(mockRecord.getRecordSequence()).thenAnswer(new 
Answer<Optional<Long>>() {
-          long sequenceCounter = 0;
-          public Optional<Long> answer(InvocationOnMock invocation) throws 
Throwable {
-             return Optional.of(sequenceCounter++);
-          }});
-        
-        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
-            long sequenceCounter = 0;
-            public Optional<String> answer(InvocationOnMock invocation) throws 
Throwable {
-               return Optional.of( "key-" + sequenceCounter++);
-            }});
-        
-        when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
-            long sequenceCounter = 0;
-            public String answer(InvocationOnMock invocation) throws Throwable 
{
-                 return new String( "value-" + sequenceCounter++ + "-" + 
UUID.randomUUID());
-            }});
-        
-        createSink();
-    }
-
-    protected abstract void createSink();
-
-    protected final void send(int numRecords) throws Exception {
-        for (int idx = 0; idx < numRecords; idx++) {
-            sink.write(mockRecord);
-        }
-    }
-    
-    protected final void runFor(int numSeconds) throws InterruptedException {
-        Producer producer = new Producer();
-        producer.start();
-        Thread.sleep(numSeconds * 1000); // Run for N seconds
-        producer.halt();
-        producer.join(2000);
-    }
-    
-    protected final class Producer extends Thread {
-        public boolean keepRunning = true;
-        @Override
-        public void run() {
-            while (keepRunning)
-                try {
-                    sink.write(mockRecord);
-                } catch (Exception e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
-        }
-        
-        public void halt() { 
-            keepRunning = false; 
-        }
-        
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
deleted file mode 100644
index 1d4b379011e..00000000000
--- 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.pulsar.io.hdfs2.Compression;
-import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;
-import org.testng.annotations.Test;
-
-import com.fasterxml.jackson.databind.exc.InvalidFormatException;
-
-
-public class HdfsSinkConfigTests {
-       
-       @Test
-       public final void loadFromYamlFileTest() throws IOException {
-               File yamlFile = getFile("sinkConfig.yaml");
-               HdfsSinkConfig config = 
HdfsSinkConfig.load(yamlFile.getAbsolutePath());
-               assertNotNull(config);
-               assertEquals("core-site.xml", config.getHdfsConfigResources());
-               assertEquals("/foo/bar", config.getDirectory());
-               assertEquals("prefix", config.getFilenamePrefix());
-               assertEquals(Compression.SNAPPY, config.getCompression());
-               assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
-       }
-       
-       @Test
-       public final void loadFromMapTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               map.put("directory", "/foo/bar");
-               map.put("filenamePrefix", "prefix");
-               map.put("compression", "SNAPPY");
-               map.put("subdirectoryPattern", "yy-MM-dd");
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               assertNotNull(config);
-               assertEquals("core-site.xml", config.getHdfsConfigResources());
-               assertEquals("/foo/bar", config.getDirectory());
-               assertEquals("prefix", config.getFilenamePrefix());
-               assertEquals(Compression.SNAPPY, config.getCompression());
-               assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
-       }
-       
-       @Test
-       public final void validValidateTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               map.put("directory", "/foo/bar");
-               map.put("filenamePrefix", "prefix");
-               map.put("fileExtension", ".txt");
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       @Test(expectedExceptions = IllegalArgumentException.class, 
-                       expectedExceptionsMessageRegExp = "Required property 
not set.")
-       public final void missingDirectoryValidateTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       @Test(expectedExceptions = IllegalArgumentException.class, 
-                 expectedExceptionsMessageRegExp = "Required property not 
set.")
-       public final void missingHdfsConfigsValidateTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("directory", "/foo/bar");
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       @Test(expectedExceptions = InvalidFormatException.class)
-       public final void invalidCodecValidateTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               map.put("directory", "/foo/bar");
-               map.put("filenamePrefix", "prefix");
-               map.put("fileExtension", ".txt");
-               map.put("compression", "bad value");
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       @Test(expectedExceptions = IllegalArgumentException.class, 
-                 expectedExceptionsMessageRegExp = "Sync Interval cannot be 
negative")
-       public final void invalidSyncIntervalTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               map.put("directory", "/foo/bar");
-               map.put("filenamePrefix", "prefix");
-               map.put("fileExtension", ".txt");
-               map.put("syncInterval", -1);
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       @Test(expectedExceptions = IllegalArgumentException.class, 
-                 expectedExceptionsMessageRegExp = "Max Pending Records must 
be a positive integer")
-       public final void invalidMaxPendingRecordsTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               map.put("directory", "/foo/bar");
-               map.put("filenamePrefix", "prefix");
-               map.put("fileExtension", ".txt");
-               map.put("maxPendingRecords", 0);
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       @Test(expectedExceptions = IllegalArgumentException.class, 
-                 expectedExceptionsMessageRegExp = "Values for both 
kerberosUserPrincipal & keytab are required.")
-       public final void kerberosValidateTest() throws IOException {
-               Map<String, Object> map = new HashMap<String, Object> ();
-               map.put("hdfsConfigResources", "core-site.xml");
-               map.put("directory", "/foo/bar");
-               map.put("filenamePrefix", "prefix");
-               map.put("keytab", "/etc/keytab/hdfs.client.ktab");
-               
-               HdfsSinkConfig config = HdfsSinkConfig.load(map);
-               config.validate();
-       }
-       
-       private File getFile(String name) {
-               ClassLoader classLoader = getClass().getClassLoader();
-               return new File(classLoader.getResource(name).getFile());
-       }
-}
diff --git 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
deleted file mode 100644
index 66cfc5b547c..00000000000
--- 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertNotNull;
-import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, 
String> {
-
-    @Override
-    protected void createSink() {
-        sink = new HdfsSequentialTextSink();
-    }
-
-    @Test
-    public final void write100Test() throws Exception {
-        map.put("filenamePrefix", "write100Test-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(sink);
-        send(100);
-
-        Thread.sleep(2000);
-        verify(mockRecord, times(100)).ack();
-        sink.close();
-    }
-
-    @Test
-    public final void write5000Test() throws Exception {
-        map.put("filenamePrefix", "write5000Test-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(sink);
-        send(5000);
-
-        Thread.sleep(2000);
-        verify(mockRecord, times(5000)).ack();
-        sink.close();
-    }
-
-    @Test
-    public final void tenSecondTest() throws Exception {
-        map.put("filenamePrefix", "tenSecondTest-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-        runFor(10);
-        sink.close();
-    }
-
-    @Test
-    public final void bzip2CompressionTest() throws Exception {
-        map.put("filenamePrefix", "bzip2CompressionTest-seq");
-        map.put("compression", "BZIP2");
-        map.remove("fileExtension");
-        sink.open(map, mockSinkContext);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-
-    @Test
-    public final void deflateCompressionTest() throws Exception {
-        map.put("filenamePrefix", "deflateCompressionTest-seq");
-        map.put("compression", "DEFLATE");
-        map.remove("fileExtension");
-        sink.open(map, mockSinkContext);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-
-    @Test
-    public final void zStandardCompressionTest() throws Exception {
-        if (System.getenv("LD_LIBRARY_PATH") == null) {
-            throw new SkipException("Skip zStandardCompressionTest since 
LD_LIBRARY_PATH is not set");
-        }
-        map.put("filenamePrefix", "zStandardCompressionTest-seq");
-        map.put("compression", "ZSTANDARD");
-        map.remove("fileExtension");
-        sink.open(map, mockSinkContext);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
deleted file mode 100644
index a64345c4e11..00000000000
--- 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertNotNull;
-import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
-
-    @Override
-    protected void createSink() {
-        sink = new HdfsTextSink();
-    }
-
-    @Test
-    public final void write100Test() throws Exception {
-        map.put("filenamePrefix", "write100TestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(sink);
-        assertNotNull(mockRecord);
-        send(100);
-
-        Thread.sleep(2000);
-        verify(mockRecord, times(100)).ack();
-        sink.close();
-    }
-
-    @Test
-    public final void write5000Test() throws Exception {
-        map.put("filenamePrefix", "write5000TestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(sink);
-        assertNotNull(mockRecord);
-        send(5000);
-
-        Thread.sleep(2000);
-        verify(mockRecord, times(5000)).ack();
-        sink.close();
-    }
-
-    @Test
-    public final void tenSecondTest() throws Exception {
-        map.put("filenamePrefix", "tenSecondTestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(mockRecord);
-
-        runFor(10);
-        sink.close();
-    }
-
-    @Test
-    public final void bzip2CompressionTest() throws Exception {
-        map.put("filenamePrefix", "bzip2CompressionTestText-seq");
-        map.put("compression", "BZIP2");
-        map.remove("fileExtension");
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(mockRecord);
-
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-
-    @Test
-    public final void deflateCompressionTest() throws Exception {
-        map.put("filenamePrefix", "deflateCompressionTestText-seq");
-        map.put("compression", "DEFLATE");
-        map.remove("fileExtension");
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(mockRecord);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-
-    @Test
-    public final void zStandardCompressionTest() throws Exception {
-        if (System.getenv("LD_LIBRARY_PATH") == null) {
-            throw new SkipException("Skip zStandardCompressionTest since 
LD_LIBRARY_PATH is not set");
-        }
-        map.put("filenamePrefix", "zStandardCompressionTestText-seq");
-        map.put("compression", "ZSTANDARD");
-        map.remove("fileExtension");
-        sink.open(map, mockSinkContext);
-
-        assertNotNull(mockRecord);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-}
diff --git 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
 
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
deleted file mode 100644
index 7bc5c15ee6a..00000000000
--- 
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
-
-    @Override
-    protected void createSink() {
-        sink = new HdfsStringSink();
-    }
-
-    @Test
-    public final void write5000Test() throws Exception {
-        map.put("filenamePrefix", "write5000Test");
-        map.put("fileExtension", ".txt");
-        map.put("separator", '\n');
-        sink.open(map, mockSinkContext);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-
-    @Test
-    public final void fiveByTwoThousandTest() throws Exception {
-        map.put("filenamePrefix", "fiveByTwoThousandTest");
-        map.put("fileExtension", ".txt");
-        map.put("separator", '\n');
-        sink.open(map, mockSinkContext);
-
-        for (int idx = 1; idx < 6; idx++) {
-            send(2000);
-        }
-        sink.close();
-        verify(mockRecord, times(2000 * 5)).ack();
-    }
-
-    @Test
-    public final void tenSecondTest() throws Exception {
-        map.put("filenamePrefix", "tenSecondTest");
-        map.put("fileExtension", ".txt");
-        map.put("separator", '\n');
-        sink.open(map, mockSinkContext);
-        runFor(10);
-        sink.close();
-    }
-
-    @Test
-    public final void maxPendingRecordsTest() throws Exception {
-        map.put("filenamePrefix", "maxPendingRecordsTest");
-        map.put("fileExtension", ".txt");
-        map.put("separator", '\n');
-        map.put("maxPendingRecords", 500);
-        sink.open(map, mockSinkContext);
-        runFor(10);
-        sink.close();
-    }
-
-    @Test
-    public final void bzip2CompressionTest() throws Exception {
-        map.put("filenamePrefix", "bzip2CompressionTest");
-        map.put("compression", "BZIP2");
-        map.remove("fileExtension");
-        map.put("separator", '\n');
-        sink.open(map, mockSinkContext);
-        send(5000);
-        sink.close();
-        verify(mockRecord, times(5000)).ack();
-    }
-
-    @Test
-    public final void deflateCompressionTest() throws Exception {
-        map.put("filenamePrefix", "deflateCompressionTest");
-        map.put("compression", "DEFLATE");
-        map.put("fileExtension", ".deflate");
-        map.put("separator", '\n');
-        sink.open(map, mockSinkContext);
-        send(50000);
-        sink.close();
-        verify(mockRecord, times(50000)).ack();
-    }
-
-    @Test
-    public final void zStandardCompressionTest() throws Exception {
-        if (System.getenv("LD_LIBRARY_PATH") == null) {
-            throw new SkipException("Skip zStandardCompressionTest since 
LD_LIBRARY_PATH is not set");
-        }
-        map.put("filenamePrefix", "zStandardCompressionTest");
-        map.put("compression", "ZSTANDARD");
-        map.put("fileExtension", ".zstandard");
-        map.put("separator", '\n');
-        sink.open(map, mockSinkContext);
-        send(50000);
-        sink.close();
-        verify(mockRecord, times(50000)).ack();
-    }
-}
diff --git a/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml 
b/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
deleted file mode 100644
index 31d1e98c475..00000000000
--- a/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-    <property>
-        <name>fs.defaultFS</name>
-        <value>hdfs://0.0.0.0:8020</value>
-    </property>
-    <property>
-        <name>io.compression.codecs</name>
-        <value>org.apache.hadoop.io.compress.GzipCodec,
-               org.apache.hadoop.io.compress.DefaultCodec,
-               org.apache.hadoop.io.compress.SnappyCodec</value>
-   </property>
-</configuration>
diff --git a/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml 
b/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
deleted file mode 100644
index bb722f1f634..00000000000
--- a/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-    <property>
-        <name>dfs.replication</name>
-        <value>1</value>
-    </property>
-    <property>
-       <name>dfs.client.use.datanode.hostname</name>
-       <value>true</value>
-    </property>
-    <property>
-       <name>dfs.support.append</name>
-       <value>true</value>
-   </property>
-</configuration>
diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml 
b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
deleted file mode 100644
index 47ab4f9737a..00000000000
--- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-"hdfsConfigResources": "core-site.xml",
-"directory": "/foo/bar",
-"filenamePrefix": "prefix",
-"compression": "SNAPPY",
-"subdirectoryPattern": "yyyy-MM-dd"
-}
\ No newline at end of file
diff --git a/pulsar-io/hdfs3/pom.xml b/pulsar-io/hdfs3/pom.xml
index f5a87879db9..e6e4e6f9a9a 100644
--- a/pulsar-io/hdfs3/pom.xml
+++ b/pulsar-io/hdfs3/pom.xml
@@ -18,7 +18,8 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.pulsar</groupId>
@@ -29,7 +30,7 @@
   <name>Pulsar IO :: Hdfs3</name>
 
   <dependencies>
-     <dependency>
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
@@ -50,29 +51,28 @@
       <artifactId>commons-collections4</artifactId>
     </dependency>
 
-       <dependency>
-               <groupId>org.apache.hadoop</groupId>
-               <artifactId>hadoop-client</artifactId>
-               <version>${hadoop3.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>jakarta.activation</groupId>
-            <artifactId>jakarta.activation-api</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>*</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-          </exclusion>
-        </exclusions>
-       </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>jakarta.activation</groupId>
+          <artifactId>jakarta.activation-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
 
     <dependency>
       <groupId>jakarta.activation</groupId>
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 46e946f10f8..6630e1f84a1 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -63,7 +63,6 @@
         <module>kafka-connect-adaptor</module>
         <module>kafka-connect-adaptor-nar</module>
         <module>debezium</module>
-        <module>hdfs2</module>
         <module>canal</module>
         <module>file</module>
         <module>netty</module>
@@ -97,7 +96,6 @@
         <module>hdfs3</module>
         <module>jdbc</module>
         <module>data-generator</module>
-        <module>hdfs2</module>
         <module>canal</module>
         <module>file</module>
         <module>netty</module>
diff --git a/tiered-storage/file-system/pom.xml 
b/tiered-storage/file-system/pom.xml
index 1ec1c816fcc..e0a31108718 100644
--- a/tiered-storage/file-system/pom.xml
+++ b/tiered-storage/file-system/pom.xml
@@ -49,6 +49,10 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>dnsjava</groupId>
+                    <artifactId>dnsjava</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -77,6 +81,10 @@
                     <groupId>javax.servlet</groupId>
                     <artifactId>servlet-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>dnsjava</groupId>
+                    <artifactId>dnsjava</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <!-- fix hadoop-commons vulnerable dependencies -->
@@ -121,6 +129,10 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>dnsjava</groupId>
+                    <artifactId>dnsjava</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 

Reply via email to