This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new ba82bf4c784 [fix][sec] Drop hdfs2 support, Upgrade hadoop3 to 3.4.0
and dnsjava to 3.6.2 to address CVE-2024-25638 (#23411)
ba82bf4c784 is described below
commit ba82bf4c78467c0716c7464a7fb7b30dd37cfaea
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Oct 11 19:22:35 2024 +0300
[fix][sec] Drop hdfs2 support, Upgrade hadoop3 to 3.4.0 and dnsjava to
3.6.2 to address CVE-2024-25638 (#23411)
---
.github/workflows/pulsar-ci.yaml | 2 +-
deployment/terraform-ansible/deploy-pulsar.yaml | 1 -
distribution/io/src/assemble/io.xml | 1 -
pom.xml | 60 ++++-
pulsar-bom/pom.xml | 5 -
pulsar-io/docs/pom.xml | 5 -
pulsar-io/hdfs2/pom.xml | 130 -----------
.../apache/pulsar/io/hdfs2/AbstractHdfsConfig.java | 76 -------
.../pulsar/io/hdfs2/AbstractHdfsConnector.java | 246 ---------------------
.../org/apache/pulsar/io/hdfs2/Compression.java | 26 ---
.../org/apache/pulsar/io/hdfs2/HdfsResources.java | 51 -----
.../org/apache/pulsar/io/hdfs2/SecurityUtil.java | 90 --------
.../org/apache/pulsar/io/hdfs2/package-info.java | 19 --
.../pulsar/io/hdfs2/sink/HdfsAbstractSink.java | 124 -----------
.../pulsar/io/hdfs2/sink/HdfsSinkConfig.java | 117 ----------
.../pulsar/io/hdfs2/sink/HdfsSyncThread.java | 79 -------
.../apache/pulsar/io/hdfs2/sink/package-info.java | 19 --
.../sink/seq/HdfsAbstractSequenceFileSink.java | 95 --------
.../io/hdfs2/sink/seq/HdfsSequentialTextSink.java | 70 ------
.../pulsar/io/hdfs2/sink/seq/HdfsTextSink.java | 53 -----
.../pulsar/io/hdfs2/sink/seq/package-info.java | 19 --
.../hdfs2/sink/text/HdfsAbstractTextFileSink.java | 78 -------
.../pulsar/io/hdfs2/sink/text/HdfsStringSink.java | 34 ---
.../pulsar/io/hdfs2/sink/text/package-info.java | 19 --
.../resources/META-INF/services/pulsar-io.yaml | 23 --
.../hdfs2/src/main/resources/findbugsExclude.xml | 58 -----
.../pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java | 120 ----------
.../pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java | 158 -------------
.../io/hdfs2/sink/seq/HdfsSequentialSinkTests.java | 110 ---------
.../io/hdfs2/sink/seq/HdfsTextSinkTests.java | 122 ----------
.../io/hdfs2/sink/text/HdfsStringSinkTests.java | 118 ----------
.../hdfs2/src/test/resources/hadoop/core-site.xml | 32 ---
.../hdfs2/src/test/resources/hadoop/hdfs-site.xml | 34 ---
pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml | 26 ---
pulsar-io/hdfs3/pom.xml | 50 ++---
pulsar-io/pom.xml | 2 -
tiered-storage/file-system/pom.xml | 12 +
37 files changed, 94 insertions(+), 2190 deletions(-)
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index bf44c51b6ad..87d8cd7cf9a 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -1498,7 +1498,7 @@ jobs:
- name: trigger dependency check
run: |
mvn -B -ntp verify -PskipDocker,skip-all,owasp-dependency-check
-Dcheckstyle.skip=true -DskipTests \
- -pl
'!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs2,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb'
+ -pl
'!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb'
- name: Upload report
uses: actions/upload-artifact@v4
diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml
b/deployment/terraform-ansible/deploy-pulsar.yaml
index db2fd1257ca..3a9f0fd942c 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -147,7 +147,6 @@
# - file
# - flume
# - hbase
-# - hdfs2
# - hdfs3
# - influxdb
# - jdbc-clickhouse
diff --git a/distribution/io/src/assemble/io.xml
b/distribution/io/src/assemble/io.xml
index f98ee14bb20..cf7731b4c85 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -63,7 +63,6 @@
<file><source>${basedir}/../../pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
-
<file><source>${basedir}/../../pulsar-io/hdfs2/target/pulsar-io-hdfs2-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/hdfs3/target/pulsar-io-hdfs3-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/file/target/pulsar-io-file-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source></file>
diff --git a/pom.xml b/pom.xml
index f99eb3066d5..b89dd1597cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,7 +196,6 @@ flexible messaging model and an intuitive client
API.</description>
<clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version>
<mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
<openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
- <hdfs-offload-version3>3.3.5</hdfs-offload-version3>
<json-smart.version>2.4.10</json-smart.version>
<opensearch.version>2.16.0</opensearch.version>
<elasticsearch-java.version>8.12.1</elasticsearch-java.version>
@@ -207,9 +206,10 @@ flexible messaging model and an intuitive client
API.</description>
<wildfly-elytron.version>1.15.16.Final</wildfly-elytron.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.28.0</opencensus.version>
- <hadoop2.version>2.10.2</hadoop2.version>
- <hadoop3.version>3.3.5</hadoop3.version>
- <hbase.version>2.4.16</hbase.version>
+ <hadoop3.version>3.4.0</hadoop3.version>
+ <dnsjava3.version>3.6.2</dnsjava3.version>
+ <hdfs-offload-version3>${hadoop3.version}</hdfs-offload-version3>
+ <hbase.version>2.6.0-hadoop3</hbase.version>
<guava.version>32.1.2-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
@@ -1313,6 +1313,58 @@ flexible messaging model and an intuitive client
API.</description>
<version>${commons.collections4.version}</version>
</dependency>
+ <!-- support only hadoop 3 since hadoop 2 is EOL -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop3.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop3.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop3.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- dnsjava is pulled in by hadoop-common -->
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ <version>${dnsjava3.version}</version>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
<groupId>com.lmax</groupId>
diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml
index d195411fa64..e674301f18a 100644
--- a/pulsar-bom/pom.xml
+++ b/pulsar-bom/pom.xml
@@ -495,11 +495,6 @@
<artifactId>pulsar-io-hbase</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io-hdfs2</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-hdfs3</artifactId>
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index ac4ae9496d1..e373db26c45 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -127,11 +127,6 @@
<artifactId>pulsar-io-hbase</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-hdfs2</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-hdfs3</artifactId>
diff --git a/pulsar-io/hdfs2/pom.xml b/pulsar-io/hdfs2/pom.xml
deleted file mode 100644
index d5fb33c170d..00000000000
--- a/pulsar-io/hdfs2/pom.xml
+++ /dev/null
@@ -1,130 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-io</artifactId>
- <version>4.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>pulsar-io-hdfs2</artifactId>
- <name>Pulsar IO :: Hdfs2</name>
-
- <dependencies>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-core</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-collections4</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop2.version}</version>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>com.github.spotbugs</groupId>
- <artifactId>spotbugs-maven-plugin</artifactId>
- <version>${spotbugs-maven-plugin.version}</version>
- <configuration>
-
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
- </configuration>
- <executions>
- <execution>
- <id>spotbugs</id>
- <phase>verify</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <profiles>
- <!--
- The only working way for OWASP dependency checker plugin
- to exclude module when failBuildOnCVSS is used
- in the root pom's plugin.
- -->
- <profile>
- <id>owasp-dependency-check</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.owasp</groupId>
- <artifactId>dependency-check-maven</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>aggregate</goal>
- </goals>
- <phase>none</phase>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
-</project>
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
deleted file mode 100644
index 757360e0453..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import java.io.Serializable;
-import lombok.Data;
-import lombok.experimental.Accessors;
-import org.apache.commons.lang.StringUtils;
-
-/**
- * Configuration object for all HDFS components.
- */
-@Data
-@Accessors(chain = true)
-public abstract class AbstractHdfsConfig implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * A file or comma separated list of files which contains the Hadoop file
system configuration,
- * e.g. 'core-site.xml', 'hdfs-site.xml'.
- */
- private String hdfsConfigResources;
-
- /**
- * The HDFS directory from which files should be read from or written to.
- */
- private String directory;
-
- /**
- * The character encoding for the files, e.g. UTF-8, ASCII, etc.
- */
- private String encoding;
-
- /**
- * The compression codec used to compress/de-compress the files on HDFS.
- */
- private Compression compression;
-
- /**
- * The Kerberos user principal account to use for authentication.
- */
- private String kerberosUserPrincipal;
-
- /**
- * The full pathname to the Kerberos keytab file to use for authentication.
- */
- private String keytab;
-
- public void validate() {
- if (StringUtils.isEmpty(hdfsConfigResources) ||
StringUtils.isEmpty(directory)) {
- throw new IllegalArgumentException("Required property not set.");
- }
-
- if ((StringUtils.isNotEmpty(kerberosUserPrincipal) &&
StringUtils.isEmpty(keytab))
- || (StringUtils.isEmpty(kerberosUserPrincipal) &&
StringUtils.isNotEmpty(keytab))) {
- throw new IllegalArgumentException("Values for both
kerberosUserPrincipal & keytab are required.");
- }
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
deleted file mode 100644
index d7277aa6273..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URI;
-import java.nio.charset.Charset;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collections;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.net.SocketFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;
-
-/**
- * A Simple abstract class for HDFS connectors.
- * Provides methods for connecting to HDFS
- */
-public abstract class AbstractHdfsConnector {
-
- private static final Object RESOURCES_LOCK = new Object();
-
- // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
- protected final AtomicReference<HdfsResources> hdfsResources = new
AtomicReference<>();
- protected AbstractHdfsConfig connectorConfig;
- protected CompressionCodecFactory compressionCodecFactory;
-
- public AbstractHdfsConnector() {
- hdfsResources.set(new HdfsResources(null, null, null));
- }
-
- /*
- * Reset Hadoop Configuration and FileSystem based on the supplied
configuration resources.
- */
- protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig)
throws IOException {
- Configuration config = new ExtendedConfiguration();
- config.setClassLoader(Thread.currentThread().getContextClassLoader());
-
- getConfig(config, connectorConfig.getHdfsConfigResources());
-
- // first check for timeout on HDFS connection, because FileSystem has
a hard coded 15 minute timeout
- checkHdfsUriForTimeout(config);
-
- /* Disable caching of Configuration and FileSystem objects, else we
cannot reconfigure
- * the processor without a complete restart
- */
- String disableCacheName = String.format("fs.%s.impl.disable.cache",
- FileSystem.getDefaultUri(config).getScheme());
- config.set(disableCacheName, "true");
-
- // If kerberos is enabled, create the file system as the kerberos
principal
- // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed
by only a single thread at at time
- FileSystem fs;
- UserGroupInformation ugi;
- synchronized (RESOURCES_LOCK) {
- if (SecurityUtil.isSecurityEnabled(config)) {
- ugi = SecurityUtil.loginKerberos(config,
- connectorConfig.getKerberosUserPrincipal(),
connectorConfig.getKeytab());
- fs = getFileSystemAsUser(config, ugi);
- } else {
- config.set("ipc.client.fallback-to-simple-auth-allowed",
"true");
- config.set("hadoop.security.authentication", "simple");
- ugi = SecurityUtil.loginSimple(config);
- fs = getFileSystemAsUser(config, ugi);
- }
- }
- return new HdfsResources(config, fs, ugi);
- }
-
- private static Configuration getConfig(final Configuration config, String
res) throws IOException {
- boolean foundResources = false;
- if (null != res) {
- String[] resources = res.split(",");
- for (String resource : resources) {
- config.addResource(new Path(resource.trim()));
- foundResources = true;
- }
- }
-
- if (!foundResources) {
- // check that at least 1 non-default resource is available on the
classpath
- String configStr = config.toString();
- for (String resource : configStr.substring(configStr.indexOf(":")
+ 1).split(",")) {
- if (!resource.contains("default") &&
config.getResource(resource.trim()) != null) {
- foundResources = true;
- break;
- }
- }
- }
-
- if (!foundResources) {
- throw new IOException("Could not find any of the " + res + " on
the classpath");
- }
- return config;
- }
-
- /*
- * Reduce the timeout of a socket connection from the default in
FileSystem.get()
- */
- protected void checkHdfsUriForTimeout(Configuration config) throws
IOException {
- URI hdfsUri = FileSystem.getDefaultUri(config);
- String address = hdfsUri.getAuthority();
- int port = hdfsUri.getPort();
- if (address == null || address.isEmpty() || port < 0) {
- return;
- }
- InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
- SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
- try (Socket socket = socketFactory.createSocket()) {
- NetUtils.connect(socket, namenode, 1000); // 1 second timeout
- }
- }
-
- /**
- * This exists in order to allow unit tests to override it so that they
don't take several
- * minutes waiting for UDP packets to be received.
- *
- * @param config
- * the configuration to use
- * @return the FileSystem that is created for the given Configuration
- * @throws IOException
- * if unable to create the FileSystem
- */
- protected FileSystem getFileSystem(final Configuration config) throws
IOException {
- return FileSystem.get(config);
- }
-
- protected FileSystem getFileSystemAsUser(final Configuration config,
UserGroupInformation ugi) throws IOException {
- try {
- return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () ->
FileSystem.get(config));
- } catch (InterruptedException e) {
- throw new IOException("Unable to create file system: " +
e.getMessage());
- }
- }
-
- protected Configuration getConfiguration() {
- return hdfsResources.get().getConfiguration();
- }
-
- protected FileSystem getFileSystem() {
- return hdfsResources.get().getFileSystem();
- }
-
- protected UserGroupInformation getUserGroupInformation() {
- return hdfsResources.get().getUserGroupInformation();
- }
-
- protected String getEncoding() {
- return StringUtils.isNotBlank(connectorConfig.getEncoding())
- ? connectorConfig.getEncoding() :
Charset.defaultCharset().name();
- }
-
- protected CompressionCodec getCompressionCodec() {
- if (connectorConfig.getCompression() == null) {
- return null;
- }
-
- CompressionCodec codec = getCompressionCodecFactory()
- .getCodecByName(connectorConfig.getCompression().name());
-
- return (codec != null) ? codec : new DefaultCodec();
- }
-
- protected CompressionCodecFactory getCompressionCodecFactory() {
- if (compressionCodecFactory == null) {
- compressionCodecFactory = new
CompressionCodecFactory(getConfiguration());
- }
-
- return compressionCodecFactory;
- }
-
- /**
- * Extending Hadoop Configuration to prevent it from caching classes that
can't be found. Since users may be
- * adding additional JARs to the classpath we don't want them to have to
restart the JVM to be able to load
- * something that was previously not found, but might now be available.
- * Reference the original getClassByNameOrNull from Configuration.
- */
- static class ExtendedConfiguration extends Configuration {
-
- private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>>
cacheClasses = new WeakHashMap<>();
-
- @Override
- public Class<?> getClassByNameOrNull(String name) {
- final ClassLoader classLoader = getClassLoader();
-
- Map<String, WeakReference<Class<?>>> map;
- synchronized (cacheClasses) {
- map = cacheClasses.get(classLoader);
- if (map == null) {
- map = Collections.synchronizedMap(new WeakHashMap<>());
- cacheClasses.put(classLoader, map);
- }
- }
-
- Class<?> clazz = null;
- WeakReference<Class<?>> ref = map.get(name);
- if (ref != null) {
- clazz = ref.get();
- }
-
- if (clazz == null) {
- try {
- clazz = Class.forName(name, true, classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- return null;
- }
- // two putters can race here, but they'll put the same class
- map.put(name, new WeakReference<>(clazz));
- return clazz;
- } else {
- // cache hit
- return clazz;
- }
- }
-
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
deleted file mode 100644
index 1e3d2f94904..00000000000
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-/**
- * An enumeration of compression codecs available for HDFS.
- */
-public enum Compression {
- BZIP2, DEFLATE, GZIP, LZ4, SNAPPY, ZSTANDARD
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
deleted file mode 100644
index 5fd6b283e6b..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * A wrapper class for HDFS resources.
- */
-public class HdfsResources {
-
- private final Configuration configuration;
- private final FileSystem fileSystem;
- private final UserGroupInformation userGroupInformation;
-
- public HdfsResources(Configuration config, FileSystem fs,
UserGroupInformation ugi) {
- this.configuration = config;
- this.fileSystem = fs;
- this.userGroupInformation = ugi;
- }
-
- public Configuration getConfiguration() {
- return configuration;
- }
-
- public FileSystem getFileSystem() {
- return fileSystem;
- }
-
- public UserGroupInformation getUserGroupInformation() {
- return userGroupInformation;
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
deleted file mode 100644
index ca178aad911..00000000000
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
-
-import java.io.IOException;
-import org.apache.commons.lang3.Validate;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Provides synchronized access to UserGroupInformation to avoid multiple
processors/services from
- * interfering with each other.
- */
-public class SecurityUtil {
- public static final String HADOOP_SECURITY_AUTHENTICATION =
"hadoop.security.authentication";
- public static final String KERBEROS = "kerberos";
-
- /**
- * Initializes UserGroupInformation with the given Configuration and
performs the login for the
- * given principal and keytab. All logins should happen through this
class to ensure other threads
- * are not concurrently modifying UserGroupInformation.
- * <p/>
- * @param config the configuration instance
- * @param principal the principal to authenticate as
- * @param keyTab the keytab to authenticate with
- *
- * @return the UGI for the given principal
- *
- * @throws IOException if login failed
- */
- public static synchronized UserGroupInformation loginKerberos(final
Configuration config,
- final String principal, final String keyTab) throws IOException {
- Validate.notNull(config);
- Validate.notNull(principal);
- Validate.notNull(keyTab);
-
- UserGroupInformation.setConfiguration(config);
- UserGroupInformation.loginUserFromKeytab(principal.trim(),
keyTab.trim());
- return UserGroupInformation.getCurrentUser();
- }
-
- /**
- * Initializes UserGroupInformation with the given Configuration and
- * returns UserGroupInformation.getLoginUser(). All logins should happen
- * through this class to ensure other threads are not concurrently
- * modifying UserGroupInformation.
- *
- * @param config the configuration instance
- *
- * @return the UGI for the given principal
- *
- * @throws IOException if login failed
- */
- public static synchronized UserGroupInformation loginSimple(final
Configuration config) throws IOException {
- Validate.notNull(config);
- UserGroupInformation.setConfiguration(config);
- return UserGroupInformation.getLoginUser();
- }
-
- /**
- * Initializes UserGroupInformation with the given Configuration and
returns
- * UserGroupInformation.isSecurityEnabled().
- * All checks for isSecurityEnabled() should happen through this method.
- *
- * @param config the given configuration
- *
- * @return true if kerberos is enabled on the given configuration, false
otherwise
- *
- */
- public static boolean isSecurityEnabled(final Configuration config) {
- Validate.notNull(config);
- return
KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
deleted file mode 100644
index 464c6db341e..00000000000
--- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2;
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
deleted file mode 100644
index 7b025d16378..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import java.io.IOException;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector;
-import org.apache.pulsar.io.hdfs2.HdfsResources;
-
-/**
- * A Simple abstract class for HDFS sink.
- * Users need to implement extractKeyValue function to use this sink.
- */
-@Slf4j
-public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector
implements Sink<V> {
-
- protected HdfsSinkConfig hdfsSinkConfig;
- protected BlockingQueue<Record<V>> unackedRecords;
- protected HdfsSyncThread<V> syncThread;
- private Path path;
- private FSDataOutputStream hdfsStream;
- private DateTimeFormatter subdirectoryFormatter;
-
- public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
- protected abstract void createWriter() throws IOException;
-
- @Override
- public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
- hdfsSinkConfig = HdfsSinkConfig.load(config);
- hdfsSinkConfig.validate();
- connectorConfig = hdfsSinkConfig;
- unackedRecords = new LinkedBlockingQueue<Record<V>>
(hdfsSinkConfig.getMaxPendingRecords());
- if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
- subdirectoryFormatter =
DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
- }
- connectToHdfs();
- createWriter();
- launchSyncThread();
- }
-
- @Override
- public void close() throws Exception {
- syncThread.halt();
- syncThread.join(0);
- }
-
- protected final void connectToHdfs() throws IOException {
- try {
- HdfsResources resources = hdfsResources.get();
-
- if (resources.getConfiguration() == null) {
- resources = this.resetHDFSResources(hdfsSinkConfig);
- hdfsResources.set(resources);
- }
- } catch (IOException ex) {
- hdfsResources.set(new HdfsResources(null, null, null));
- throw ex;
- }
- }
-
- protected FSDataOutputStream getHdfsStream() throws
IllegalArgumentException, IOException {
- if (hdfsStream == null) {
- Path path = getPath();
- FileSystem fs = getFileSystemAsUser(getConfiguration(),
getUserGroupInformation());
- hdfsStream = fs.exists(path) ? fs.append(path) : fs.create(path);
- }
- return hdfsStream;
- }
-
- protected final Path getPath() {
- if (path == null) {
- String ext = "";
- if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) {
- ext = hdfsSinkConfig.getFileExtension();
- } else if (getCompressionCodec() != null) {
- ext = getCompressionCodec().getDefaultExtension();
- }
-
- String directory = hdfsSinkConfig.getDirectory();
- if (subdirectoryFormatter != null) {
- directory = FilenameUtils.concat(directory,
LocalDateTime.now().format(subdirectoryFormatter));
- }
- path = new Path(FilenameUtils.concat(directory,
- hdfsSinkConfig.getFilenamePrefix() + "-" +
System.currentTimeMillis() + ext));
- log.info("Create path: {}", path);
- }
- return path;
- }
-
- protected final void launchSyncThread() throws IOException {
- syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords,
hdfsSinkConfig.getSyncInterval());
- syncThread.start();
- }
-}
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
deleted file mode 100644
index 9e1c6090fb5..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.experimental.Accessors;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.io.hdfs2.AbstractHdfsConfig;
-
-/**
- * Configuration object for all HDFS Sink components.
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-@Accessors(chain = true)
-public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable
{
-
- private static final long serialVersionUID = 1L;
-
- /**
- * The prefix of the files to create inside the HDFS directory, i.e. a
value of "topicA"
- * will result in files named topicA-, topicA-, etc being produced
- */
- private String filenamePrefix;
-
- /**
- * The extension to add to the files written to HDFS, e.g. '.txt', '.seq',
etc.
- */
- private String fileExtension;
-
- /**
- * The character to use to separate records in a text file. If no value is
provided
- * then the content from all of the records will be concatenated together
in one continuous
- * byte array.
- */
- private char separator;
-
- /**
- * The interval (in milliseconds) between calls to flush data to HDFS disk.
- */
- private long syncInterval;
-
- /**
- * The maximum number of records that we hold in memory before acking.
Default is Integer.MAX_VALUE.
- * Setting this value to one, results in every record being sent to disk
before the record is acked,
- * while setting it to a higher values allows us to buffer records before
flushing them all to disk.
- */
- private int maxPendingRecords = Integer.MAX_VALUE;
-
- /**
- * A subdirectory associated with the created time of the sink.
- * The pattern is the formatted pattern of {@link
AbstractHdfsConfig#getDirectory()}'s subdirectory.
- *
- * @see java.time.format.DateTimeFormatter for pattern's syntax
- */
- private String subdirectoryPattern;
-
- public static HdfsSinkConfig load(String yamlFile) throws IOException {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
- }
-
- public static HdfsSinkConfig load(Map<String, Object> map) throws
IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(mapper.writeValueAsString(map),
HdfsSinkConfig.class);
- }
-
- @Override
- public void validate() {
- super.validate();
- if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
- || StringUtils.isEmpty(filenamePrefix)) {
- throw new IllegalArgumentException("Required property not set.");
- }
-
- if (syncInterval < 0) {
- throw new IllegalArgumentException("Sync Interval cannot be
negative");
- }
-
- if (maxPendingRecords < 1) {
- throw new IllegalArgumentException("Max Pending Records must be a
positive integer");
- }
-
- if (subdirectoryPattern != null) {
- try {
- LocalDateTime.of(2020, 1, 1, 12,
0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
- } catch (Exception e) {
- throw new IllegalArgumentException(subdirectoryPattern + " is
not a valid pattern: " + e.getMessage());
- }
- }
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
deleted file mode 100644
index 9ddd83f4423..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hadoop.fs.Syncable;
-import org.apache.pulsar.functions.api.Record;
-
-/**
- * A thread that runs in the background and acknowledges Records
- * after they have been written to disk.
- *
- * @param <V>
- */
-public class HdfsSyncThread<V> extends Thread {
-
- private final Syncable stream;
- private final BlockingQueue<Record<V>> unackedRecords;
- private final long syncInterval;
- private boolean keepRunning = true;
-
- public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>>
unackedRecords, long syncInterval) {
- this.stream = stream;
- this.unackedRecords = unackedRecords;
- this.syncInterval = syncInterval;
- }
-
- @Override
- public void run() {
- while (keepRunning) {
- try {
- Thread.sleep(syncInterval);
- ackRecords();
- } catch (InterruptedException e) {
- return;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- public final void halt() throws IOException, InterruptedException {
- keepRunning = false;
- ackRecords();
- }
-
- private void ackRecords() throws IOException, InterruptedException {
-
- if (CollectionUtils.isEmpty(unackedRecords)) {
- return;
- }
-
- synchronized (stream) {
- stream.hsync();
- }
-
- while (!unackedRecords.isEmpty()) {
- unackedRecords.take().ack();
- }
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
deleted file mode 100644
index 238a441ee0e..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
deleted file mode 100644
index 355c00080ef..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Writer.Option;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * HDFS Sink that writes it contents to HDFS as Sequence Files.
- *
- * @param <K> - The incoming Key type
- * @param <V> - The incoming Value type
- * @param <HdfsK> - The HDFS Key type
- * @param <HdfsV> - The HDFS Value type
- */
-public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
- extends HdfsAbstractSink<K, V> implements Sink<V> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);
-
- protected AtomicLong counter;
- protected FSDataOutputStream hdfsStream;
- protected Writer writer = null;
-
- public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);
-
- @Override
- public void close() throws Exception {
- writer.close();
- super.close();
- }
-
- @Override
- protected void createWriter() throws IOException {
- writer = getWriter();
- }
-
- @Override
- public void write(Record<V> record) {
- try {
- KeyValue<K, V> kv = extractKeyValue(record);
- KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
- writer.append(keyValue.getKey(), keyValue.getValue());
- unackedRecords.put(record);
- } catch (IOException | InterruptedException e) {
- LOG.error("Unable to write to file " + getPath(), e);
- record.fail();
- }
- }
-
- protected Writer getWriter() throws IOException {
- counter = new AtomicLong(0);
- List<Option> options = getOptions();
- return SequenceFile.createWriter(getConfiguration(),
- options.toArray(new Option[options.size()]));
- }
-
- protected List<Option> getOptions() throws IllegalArgumentException,
IOException {
- List<Option> list = new ArrayList<Option>();
- list.add(Writer.stream(getHdfsStream()));
-
- if (getCompressionCodec() != null) {
- list.add(Writer.compression(SequenceFile.CompressionType.RECORD,
getCompressionCodec()));
- }
- return list;
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
deleted file mode 100644
index 8fa908c3c8c..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Writer.Option;
-import org.apache.hadoop.io.Text;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * This Sink should be used when the records are originating from a sequential
source,
- * and we want to retain the record sequence.This class uses the record's
sequence id as
- * the sequence id in the HDFS Sequence File if it is available, if not a
sequence id is
- * auto-generated for each new record.
- */
-public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long,
String, LongWritable, Text> {
-
- private AtomicLong counter;
-
- @Override
- public Writer getWriter() throws IOException {
- counter = new AtomicLong(0);
-
- return SequenceFile
- .createWriter(
- getConfiguration(),
- getOptions().toArray(new Option[getOptions().size()]));
- }
-
- @Override
- protected List<Option> getOptions() throws IllegalArgumentException,
IOException {
- List<Option> opts = super.getOptions();
- opts.add(Writer.keyClass(LongWritable.class));
- opts.add(Writer.valueClass(Text.class));
- return opts;
- }
-
- @Override
- public KeyValue<Long, String> extractKeyValue(Record<String> record) {
- Long sequence = record.getRecordSequence().orElseGet(() ->
counter.incrementAndGet());
- return new KeyValue<>(sequence, record.getValue());
- }
-
- @Override
- public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
- return new KeyValue<>(new LongWritable(kv.getKey()), new
Text(kv.getValue()));
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
deleted file mode 100644
index 3c643e287a9..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.SequenceFile.Writer.Option;
-import org.apache.hadoop.io.Text;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-
-/**
- * A Simple Sink class for Hdfs Sequence File.
- */
-public class HdfsTextSink extends
- HdfsAbstractSequenceFileSink<String, String, Text, Text> {
-
- @Override
- protected List<Option> getOptions() throws IllegalArgumentException,
IOException {
- List<Option> opts = super.getOptions();
- opts.add(Writer.keyClass(Text.class));
- opts.add(Writer.valueClass(Text.class));
- return opts;
- }
-
- @Override
- public KeyValue<String, String> extractKeyValue(Record<String> record) {
- String key = record.getKey().orElseGet(() -> record.getValue());
- return new KeyValue<>(key, record.getValue());
- }
-
- @Override
- public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
- return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
deleted file mode 100644
index 0bcb0d837fc..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
deleted file mode 100644
index d96309695f1..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for HDFS Sinks that writes there contents to HDFS as Text Files.
- *
- * @param <K>
- * @param <V>
- */
-public abstract class HdfsAbstractTextFileSink<K, V> extends
HdfsAbstractSink<K, V> implements Sink<V> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);
-
- protected OutputStreamWriter writer;
-
- @Override
- protected void createWriter() throws IOException {
- writer = new OutputStreamWriter(new
BufferedOutputStream(openHdfsStream()), getEncoding());
- }
-
- @Override
- public void close() throws Exception {
- writer.close();
- super.close();
- }
-
- @Override
- public void write(Record<V> record) {
- try {
- KeyValue<K, V> kv = extractKeyValue(record);
- writer.write(kv.getValue().toString());
-
- if (hdfsSinkConfig.getSeparator() != '\u0000') {
- writer.write(hdfsSinkConfig.getSeparator());
- }
- unackedRecords.put(record);
- } catch (IOException | InterruptedException e) {
- LOG.error("Unable to write to file " + getPath(), e);
- record.fail();
- }
- }
-
- private OutputStream openHdfsStream() throws IOException {
- if (hdfsSinkConfig.getCompression() != null) {
- return getCompressionCodec().createOutputStream(getHdfsStream());
- } else {
- return getHdfsStream();
- }
- }
-}
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
deleted file mode 100644
index 4de5de1aedf..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Sink;
-
-/**
- * A Simple Sink class for Hdfs Text File.
- */
-public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String>
implements Sink<String> {
- @Override
- public KeyValue<String, String> extractKeyValue(Record<String> record) {
- String key = record.getKey().orElseGet(() -> record.getValue());
- return new KeyValue<>(key, record.getValue());
- }
-}
diff --git
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
deleted file mode 100644
index a5618597197..00000000000
---
a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
\ No newline at end of file
diff --git
a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
deleted file mode 100644
index 8bc33954c29..00000000000
--- a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-name: hdfs2
-description: Writes data into HDFS 2.x
-sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink
-sinkConfigClass: org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig
diff --git a/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml
b/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index 980349c4d8d..00000000000
--- a/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<FindBugsFilter>
- <!-- Ignore violations that were present when the rule was enabled -->
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
- <Method name="getConfiguration"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
- <Method name="getFileSystem"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
- <Method name="getUserGroupInformation"/>
- <Bug pattern="EI_EXPOSE_REP"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
- <Method name="<init>"/>
- <Bug pattern="EI_EXPOSE_REP2"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
- <Method name="<init>"/>
- <Bug pattern="EI_EXPOSE_REP2"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/>
- <Method name="<init>"/>
- <Bug pattern="EI_EXPOSE_REP2"/>
- </Match>
- <Match>
- <Class name="org.apache.pulsar.io.hdfs2.sink.HdfsSyncThread"/>
- <Method name="<init>"/>
- <Bug pattern="EI_EXPOSE_REP2"/>
- </Match>
-</FindBugsFilter>
diff --git
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
deleted file mode 100644
index 2660199dc18..00000000000
---
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.testng.annotations.BeforeMethod;
-
-/**
- * Simple base class for all the HDFS sink test cases.
- * Provides utility methods for sending records to the sink.
- *
- */
-public abstract class AbstractHdfsSinkTest<K, V> {
-
- @Mock
- protected SinkContext mockSinkContext;
-
- @Mock
- protected Record<V> mockRecord;
-
- protected Map<String, Object> map;
- protected HdfsAbstractSink<K, V> sink;
-
- @SuppressWarnings("unchecked")
- @BeforeMethod(alwaysRun = true)
- public final void setUp() throws Exception {
- map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources",
"../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml,"
- +
"../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml");
- map.put("directory", "/tmp/testing");
- map.put("filenamePrefix", "prefix");
-
- mockSinkContext = mock(SinkContext.class);
-
- mockRecord = mock(Record.class);
- when(mockRecord.getRecordSequence()).thenAnswer(new
Answer<Optional<Long>>() {
- long sequenceCounter = 0;
- public Optional<Long> answer(InvocationOnMock invocation) throws
Throwable {
- return Optional.of(sequenceCounter++);
- }});
-
- when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
- long sequenceCounter = 0;
- public Optional<String> answer(InvocationOnMock invocation) throws
Throwable {
- return Optional.of( "key-" + sequenceCounter++);
- }});
-
- when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
- long sequenceCounter = 0;
- public String answer(InvocationOnMock invocation) throws Throwable
{
- return new String( "value-" + sequenceCounter++ + "-" +
UUID.randomUUID());
- }});
-
- createSink();
- }
-
- protected abstract void createSink();
-
- protected final void send(int numRecords) throws Exception {
- for (int idx = 0; idx < numRecords; idx++) {
- sink.write(mockRecord);
- }
- }
-
- protected final void runFor(int numSeconds) throws InterruptedException {
- Producer producer = new Producer();
- producer.start();
- Thread.sleep(numSeconds * 1000); // Run for N seconds
- producer.halt();
- producer.join(2000);
- }
-
- protected final class Producer extends Thread {
- public boolean keepRunning = true;
- @Override
- public void run() {
- while (keepRunning)
- try {
- sink.write(mockRecord);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void halt() {
- keepRunning = false;
- }
-
- }
-}
diff --git
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
deleted file mode 100644
index 1d4b379011e..00000000000
---
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.pulsar.io.hdfs2.Compression;
-import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;
-import org.testng.annotations.Test;
-
-import com.fasterxml.jackson.databind.exc.InvalidFormatException;
-
-
-public class HdfsSinkConfigTests {
-
- @Test
- public final void loadFromYamlFileTest() throws IOException {
- File yamlFile = getFile("sinkConfig.yaml");
- HdfsSinkConfig config =
HdfsSinkConfig.load(yamlFile.getAbsolutePath());
- assertNotNull(config);
- assertEquals("core-site.xml", config.getHdfsConfigResources());
- assertEquals("/foo/bar", config.getDirectory());
- assertEquals("prefix", config.getFilenamePrefix());
- assertEquals(Compression.SNAPPY, config.getCompression());
- assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
- }
-
- @Test
- public final void loadFromMapTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
- map.put("directory", "/foo/bar");
- map.put("filenamePrefix", "prefix");
- map.put("compression", "SNAPPY");
- map.put("subdirectoryPattern", "yy-MM-dd");
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- assertNotNull(config);
- assertEquals("core-site.xml", config.getHdfsConfigResources());
- assertEquals("/foo/bar", config.getDirectory());
- assertEquals("prefix", config.getFilenamePrefix());
- assertEquals(Compression.SNAPPY, config.getCompression());
- assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
- }
-
- @Test
- public final void validValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
- map.put("directory", "/foo/bar");
- map.put("filenamePrefix", "prefix");
- map.put("fileExtension", ".txt");
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required property
not set.")
- public final void missingDirectoryValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required property not
set.")
- public final void missingHdfsConfigsValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("directory", "/foo/bar");
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = InvalidFormatException.class)
- public final void invalidCodecValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
- map.put("directory", "/foo/bar");
- map.put("filenamePrefix", "prefix");
- map.put("fileExtension", ".txt");
- map.put("compression", "bad value");
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Sync Interval cannot be
negative")
- public final void invalidSyncIntervalTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
- map.put("directory", "/foo/bar");
- map.put("filenamePrefix", "prefix");
- map.put("fileExtension", ".txt");
- map.put("syncInterval", -1);
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Max Pending Records must
be a positive integer")
- public final void invalidMaxPendingRecordsTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
- map.put("directory", "/foo/bar");
- map.put("filenamePrefix", "prefix");
- map.put("fileExtension", ".txt");
- map.put("maxPendingRecords", 0);
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Values for both
kerberosUserPrincipal & keytab are required.")
- public final void kerberosValidateTest() throws IOException {
- Map<String, Object> map = new HashMap<String, Object> ();
- map.put("hdfsConfigResources", "core-site.xml");
- map.put("directory", "/foo/bar");
- map.put("filenamePrefix", "prefix");
- map.put("keytab", "/etc/keytab/hdfs.client.ktab");
-
- HdfsSinkConfig config = HdfsSinkConfig.load(map);
- config.validate();
- }
-
- private File getFile(String name) {
- ClassLoader classLoader = getClass().getClassLoader();
- return new File(classLoader.getResource(name).getFile());
- }
-}
diff --git
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
deleted file mode 100644
index 66cfc5b547c..00000000000
---
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertNotNull;
-import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long,
String> {
-
- @Override
- protected void createSink() {
- sink = new HdfsSequentialTextSink();
- }
-
- @Test
- public final void write100Test() throws Exception {
- map.put("filenamePrefix", "write100Test-seq");
- map.put("fileExtension", ".seq");
- map.put("syncInterval", 1000);
- sink.open(map, mockSinkContext);
-
- assertNotNull(sink);
- send(100);
-
- Thread.sleep(2000);
- verify(mockRecord, times(100)).ack();
- sink.close();
- }
-
- @Test
- public final void write5000Test() throws Exception {
- map.put("filenamePrefix", "write5000Test-seq");
- map.put("fileExtension", ".seq");
- map.put("syncInterval", 1000);
- sink.open(map, mockSinkContext);
-
- assertNotNull(sink);
- send(5000);
-
- Thread.sleep(2000);
- verify(mockRecord, times(5000)).ack();
- sink.close();
- }
-
- @Test
- public final void tenSecondTest() throws Exception {
- map.put("filenamePrefix", "tenSecondTest-seq");
- map.put("fileExtension", ".seq");
- map.put("syncInterval", 1000);
- sink.open(map, mockSinkContext);
- runFor(10);
- sink.close();
- }
-
- @Test
- public final void bzip2CompressionTest() throws Exception {
- map.put("filenamePrefix", "bzip2CompressionTest-seq");
- map.put("compression", "BZIP2");
- map.remove("fileExtension");
- sink.open(map, mockSinkContext);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-
- @Test
- public final void deflateCompressionTest() throws Exception {
- map.put("filenamePrefix", "deflateCompressionTest-seq");
- map.put("compression", "DEFLATE");
- map.remove("fileExtension");
- sink.open(map, mockSinkContext);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-
- @Test
- public final void zStandardCompressionTest() throws Exception {
- if (System.getenv("LD_LIBRARY_PATH") == null) {
- throw new SkipException("Skip zStandardCompressionTest since
LD_LIBRARY_PATH is not set");
- }
- map.put("filenamePrefix", "zStandardCompressionTest-seq");
- map.put("compression", "ZSTANDARD");
- map.remove("fileExtension");
- sink.open(map, mockSinkContext);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-}
diff --git
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
deleted file mode 100644
index a64345c4e11..00000000000
---
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.seq;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertNotNull;
-import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
-
- @Override
- protected void createSink() {
- sink = new HdfsTextSink();
- }
-
- @Test
- public final void write100Test() throws Exception {
- map.put("filenamePrefix", "write100TestText-seq");
- map.put("fileExtension", ".seq");
- map.put("syncInterval", 1000);
- sink.open(map, mockSinkContext);
-
- assertNotNull(sink);
- assertNotNull(mockRecord);
- send(100);
-
- Thread.sleep(2000);
- verify(mockRecord, times(100)).ack();
- sink.close();
- }
-
- @Test
- public final void write5000Test() throws Exception {
- map.put("filenamePrefix", "write5000TestText-seq");
- map.put("fileExtension", ".seq");
- map.put("syncInterval", 1000);
- sink.open(map, mockSinkContext);
-
- assertNotNull(sink);
- assertNotNull(mockRecord);
- send(5000);
-
- Thread.sleep(2000);
- verify(mockRecord, times(5000)).ack();
- sink.close();
- }
-
- @Test
- public final void tenSecondTest() throws Exception {
- map.put("filenamePrefix", "tenSecondTestText-seq");
- map.put("fileExtension", ".seq");
- map.put("syncInterval", 1000);
- sink.open(map, mockSinkContext);
-
- assertNotNull(mockRecord);
-
- runFor(10);
- sink.close();
- }
-
- @Test
- public final void bzip2CompressionTest() throws Exception {
- map.put("filenamePrefix", "bzip2CompressionTestText-seq");
- map.put("compression", "BZIP2");
- map.remove("fileExtension");
- sink.open(map, mockSinkContext);
-
- assertNotNull(mockRecord);
-
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-
- @Test
- public final void deflateCompressionTest() throws Exception {
- map.put("filenamePrefix", "deflateCompressionTestText-seq");
- map.put("compression", "DEFLATE");
- map.remove("fileExtension");
- sink.open(map, mockSinkContext);
-
- assertNotNull(mockRecord);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-
- @Test
- public final void zStandardCompressionTest() throws Exception {
- if (System.getenv("LD_LIBRARY_PATH") == null) {
- throw new SkipException("Skip zStandardCompressionTest since
LD_LIBRARY_PATH is not set");
- }
- map.put("filenamePrefix", "zStandardCompressionTestText-seq");
- map.put("compression", "ZSTANDARD");
- map.remove("fileExtension");
- sink.open(map, mockSinkContext);
-
- assertNotNull(mockRecord);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-}
diff --git
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
deleted file mode 100644
index 7bc5c15ee6a..00000000000
---
a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.hdfs2.sink.text;
-
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
-
- @Override
- protected void createSink() {
- sink = new HdfsStringSink();
- }
-
- @Test
- public final void write5000Test() throws Exception {
- map.put("filenamePrefix", "write5000Test");
- map.put("fileExtension", ".txt");
- map.put("separator", '\n');
- sink.open(map, mockSinkContext);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-
- @Test
- public final void fiveByTwoThousandTest() throws Exception {
- map.put("filenamePrefix", "fiveByTwoThousandTest");
- map.put("fileExtension", ".txt");
- map.put("separator", '\n');
- sink.open(map, mockSinkContext);
-
- for (int idx = 1; idx < 6; idx++) {
- send(2000);
- }
- sink.close();
- verify(mockRecord, times(2000 * 5)).ack();
- }
-
- @Test
- public final void tenSecondTest() throws Exception {
- map.put("filenamePrefix", "tenSecondTest");
- map.put("fileExtension", ".txt");
- map.put("separator", '\n');
- sink.open(map, mockSinkContext);
- runFor(10);
- sink.close();
- }
-
- @Test
- public final void maxPendingRecordsTest() throws Exception {
- map.put("filenamePrefix", "maxPendingRecordsTest");
- map.put("fileExtension", ".txt");
- map.put("separator", '\n');
- map.put("maxPendingRecords", 500);
- sink.open(map, mockSinkContext);
- runFor(10);
- sink.close();
- }
-
- @Test
- public final void bzip2CompressionTest() throws Exception {
- map.put("filenamePrefix", "bzip2CompressionTest");
- map.put("compression", "BZIP2");
- map.remove("fileExtension");
- map.put("separator", '\n');
- sink.open(map, mockSinkContext);
- send(5000);
- sink.close();
- verify(mockRecord, times(5000)).ack();
- }
-
- @Test
- public final void deflateCompressionTest() throws Exception {
- map.put("filenamePrefix", "deflateCompressionTest");
- map.put("compression", "DEFLATE");
- map.put("fileExtension", ".deflate");
- map.put("separator", '\n');
- sink.open(map, mockSinkContext);
- send(50000);
- sink.close();
- verify(mockRecord, times(50000)).ack();
- }
-
- @Test
- public final void zStandardCompressionTest() throws Exception {
- if (System.getenv("LD_LIBRARY_PATH") == null) {
- throw new SkipException("Skip zStandardCompressionTest since
LD_LIBRARY_PATH is not set");
- }
- map.put("filenamePrefix", "zStandardCompressionTest");
- map.put("compression", "ZSTANDARD");
- map.put("fileExtension", ".zstandard");
- map.put("separator", '\n');
- sink.open(map, mockSinkContext);
- send(50000);
- sink.close();
- verify(mockRecord, times(50000)).ack();
- }
-}
diff --git a/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
b/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
deleted file mode 100644
index 31d1e98c475..00000000000
--- a/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>fs.defaultFS</name>
- <value>hdfs://0.0.0.0:8020</value>
- </property>
- <property>
- <name>io.compression.codecs</name>
- <value>org.apache.hadoop.io.compress.GzipCodec,
- org.apache.hadoop.io.compress.DefaultCodec,
- org.apache.hadoop.io.compress.SnappyCodec</value>
- </property>
-</configuration>
diff --git a/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
b/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
deleted file mode 100644
index bb722f1f634..00000000000
--- a/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-<configuration>
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
- <property>
- <name>dfs.client.use.datanode.hostname</name>
- <value>true</value>
- </property>
- <property>
- <name>dfs.support.append</name>
- <value>true</value>
- </property>
-</configuration>
diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
deleted file mode 100644
index 47ab4f9737a..00000000000
--- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-"hdfsConfigResources": "core-site.xml",
-"directory": "/foo/bar",
-"filenamePrefix": "prefix",
-"compression": "SNAPPY",
-"subdirectoryPattern": "yyyy-MM-dd"
-}
\ No newline at end of file
diff --git a/pulsar-io/hdfs3/pom.xml b/pulsar-io/hdfs3/pom.xml
index f5a87879db9..e6e4e6f9a9a 100644
--- a/pulsar-io/hdfs3/pom.xml
+++ b/pulsar-io/hdfs3/pom.xml
@@ -18,7 +18,8 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
@@ -29,7 +30,7 @@
<name>Pulsar IO :: Hdfs3</name>
<dependencies>
- <dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
@@ -50,29 +51,28 @@
<artifactId>commons-collections4</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop3.version}</version>
- <exclusions>
- <exclusion>
- <groupId>jakarta.activation</groupId>
- <artifactId>jakarta.activation-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>jakarta.activation</groupId>
+ <artifactId>jakarta.activation-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>jakarta.activation</groupId>
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 46e946f10f8..6630e1f84a1 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -63,7 +63,6 @@
<module>kafka-connect-adaptor</module>
<module>kafka-connect-adaptor-nar</module>
<module>debezium</module>
- <module>hdfs2</module>
<module>canal</module>
<module>file</module>
<module>netty</module>
@@ -97,7 +96,6 @@
<module>hdfs3</module>
<module>jdbc</module>
<module>data-generator</module>
- <module>hdfs2</module>
<module>canal</module>
<module>file</module>
<module>netty</module>
diff --git a/tiered-storage/file-system/pom.xml
b/tiered-storage/file-system/pom.xml
index 1ec1c816fcc..e0a31108718 100644
--- a/tiered-storage/file-system/pom.xml
+++ b/tiered-storage/file-system/pom.xml
@@ -49,6 +49,10 @@
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -77,6 +81,10 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<!-- fix hadoop-commons vulnerable dependencies -->
@@ -121,6 +129,10 @@
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ </exclusion>
</exclusions>
</dependency>