This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 63611108ea6ae986ca27518cda5d2cc13aada8ee Author: Lari Hotari <[email protected]> AuthorDate: Fri Oct 11 19:22:35 2024 +0300 [fix][sec] Drop hdfs2 support, Upgrade hadoop3 to 3.4.0 and dnsjava to 3.6.2 to address CVE-2024-25638 (#23411) (cherry picked from commit d5e3675439458faa6f1d75929719d5a80e34238a) --- .github/workflows/pulsar-ci.yaml | 2 +- deployment/terraform-ansible/deploy-pulsar.yaml | 1 - distribution/io/src/assemble/io.xml | 1 - pom.xml | 60 ++++- pulsar-bom/pom.xml | 5 - pulsar-io/docs/pom.xml | 5 - pulsar-io/hdfs2/pom.xml | 130 ----------- .../apache/pulsar/io/hdfs2/AbstractHdfsConfig.java | 76 ------- .../pulsar/io/hdfs2/AbstractHdfsConnector.java | 246 --------------------- .../org/apache/pulsar/io/hdfs2/Compression.java | 26 --- .../org/apache/pulsar/io/hdfs2/HdfsResources.java | 51 ----- .../org/apache/pulsar/io/hdfs2/SecurityUtil.java | 90 -------- .../org/apache/pulsar/io/hdfs2/package-info.java | 19 -- .../pulsar/io/hdfs2/sink/HdfsAbstractSink.java | 124 ----------- .../pulsar/io/hdfs2/sink/HdfsSinkConfig.java | 117 ---------- .../pulsar/io/hdfs2/sink/HdfsSyncThread.java | 79 ------- .../apache/pulsar/io/hdfs2/sink/package-info.java | 19 -- .../sink/seq/HdfsAbstractSequenceFileSink.java | 95 -------- .../io/hdfs2/sink/seq/HdfsSequentialTextSink.java | 70 ------ .../pulsar/io/hdfs2/sink/seq/HdfsTextSink.java | 53 ----- .../pulsar/io/hdfs2/sink/seq/package-info.java | 19 -- .../hdfs2/sink/text/HdfsAbstractTextFileSink.java | 78 ------- .../pulsar/io/hdfs2/sink/text/HdfsStringSink.java | 34 --- .../pulsar/io/hdfs2/sink/text/package-info.java | 19 -- .../resources/META-INF/services/pulsar-io.yaml | 23 -- .../hdfs2/src/main/resources/findbugsExclude.xml | 58 ----- .../pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java | 120 ---------- .../pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java | 158 ------------- .../io/hdfs2/sink/seq/HdfsSequentialSinkTests.java | 110 --------- .../io/hdfs2/sink/seq/HdfsTextSinkTests.java | 122 ---------- .../io/hdfs2/sink/text/HdfsStringSinkTests.java | 118 ---------- .../hdfs2/src/test/resources/hadoop/core-site.xml | 32 --- .../hdfs2/src/test/resources/hadoop/hdfs-site.xml | 34 --- pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml | 26 --- pulsar-io/hdfs3/pom.xml | 50 ++--- pulsar-io/pom.xml | 2 - tiered-storage/file-system/pom.xml | 12 + 37 files changed, 94 insertions(+), 2190 deletions(-) diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index d8e5374e45e..0b27c6963ed 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -1495,7 +1495,7 @@ jobs: - name: trigger dependency check run: | mvn -B -ntp verify -PskipDocker,skip-all,owasp-dependency-check -Dcheckstyle.skip=true -DskipTests \ - -pl '!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs2,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb' + -pl '!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb' - name: Upload report uses: actions/upload-artifact@v4 diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml b/deployment/terraform-ansible/deploy-pulsar.yaml index db2fd1257ca..3a9f0fd942c 100644 --- a/deployment/terraform-ansible/deploy-pulsar.yaml +++ b/deployment/terraform-ansible/deploy-pulsar.yaml @@ -147,7 +147,6 @@ # - file # - flume # - hbase -# - hdfs2 # - hdfs3 # - influxdb # - jdbc-clickhouse diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index f98ee14bb20..cf7731b4c85 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -63,7 +63,6 @@ <file><source>${basedir}/../../pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source></file> <file><source>${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar</source></file> <file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file> - <file><source>${basedir}/../../pulsar-io/hdfs2/target/pulsar-io-hdfs2-${project.version}.nar</source></file> <file><source>${basedir}/../../pulsar-io/hdfs3/target/pulsar-io-hdfs3-${project.version}.nar</source></file> <file><source>${basedir}/../../pulsar-io/file/target/pulsar-io-file-${project.version}.nar</source></file> <file><source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source></file> diff --git a/pom.xml b/pom.xml index c8d639ddc49..4c1d7d80488 100644 --- a/pom.xml +++ b/pom.xml @@ -192,7 +192,6 @@ flexible messaging model and an intuitive client API.</description> <clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version> <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version> <openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version> - <hdfs-offload-version3>3.3.5</hdfs-offload-version3> <json-smart.version>2.4.10</json-smart.version> <opensearch.version>1.2.4</opensearch.version> <elasticsearch-java.version>8.12.1</elasticsearch-java.version> @@ -203,9 +202,10 @@ flexible messaging model and an intuitive client API.</description> <wildfly-elytron.version>1.15.16.Final</wildfly-elytron.version> <jsonwebtoken.version>0.11.1</jsonwebtoken.version> <opencensus.version>0.28.0</opencensus.version> - <hadoop2.version>2.10.2</hadoop2.version> - <hadoop3.version>3.3.5</hadoop3.version> - <hbase.version>2.4.16</hbase.version> + <hadoop3.version>3.4.0</hadoop3.version> + <dnsjava3.version>3.6.2</dnsjava3.version> + <hdfs-offload-version3>${hadoop3.version}</hdfs-offload-version3> + <hbase.version>2.6.0-hadoop3</hbase.version> <guava.version>32.1.2-jre</guava.version> <jcip.version>1.0</jcip.version> <prometheus-jmx.version>0.16.1</prometheus-jmx.version> @@ -1296,6 +1296,58 @@ flexible messaging model and an intuitive client API.</description> <version>${commons.collections4.version}</version> </dependency> + <!-- support only hadoop 3 since hadoop 2 is EOL --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop3.version}</version> + <exclusions> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop3.version}</version> + <exclusions> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop3.version}</version> + <exclusions> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- dnsjava is pulled in by hadoop-common --> + <dependency> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + <version>${dnsjava3.version}</version> + </dependency> + <!-- test dependencies --> <dependency> <groupId>com.lmax</groupId> diff --git a/pulsar-bom/pom.xml b/pulsar-bom/pom.xml index f8db9e204d0..b7710fde427 100644 --- a/pulsar-bom/pom.xml +++ b/pulsar-bom/pom.xml @@ -495,11 +495,6 @@ <artifactId>pulsar-io-hbase</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-io-hdfs2</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-io-hdfs3</artifactId> diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml index be07fcd4642..dedf5137f44 100644 --- a/pulsar-io/docs/pom.xml +++ b/pulsar-io/docs/pom.xml @@ -127,11 +127,6 @@ <artifactId>pulsar-io-hbase</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-io-hdfs2</artifactId> - <version>${project.version}</version> - </dependency> <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-io-hdfs3</artifactId> diff --git a/pulsar-io/hdfs2/pom.xml b/pulsar-io/hdfs2/pom.xml deleted file mode 100644 index 55c026d7b05..00000000000 --- a/pulsar-io/hdfs2/pom.xml +++ /dev/null @@ -1,130 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-io</artifactId> - <version>3.3.3-SNAPSHOT</version> - </parent> - <artifactId>pulsar-io-hdfs2</artifactId> - <name>Pulsar IO :: Hdfs2</name> - - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-io-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-yaml</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-collections4</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop2.version}</version> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-nar-maven-plugin</artifactId> - </plugin> - <plugin> - <groupId>com.github.spotbugs</groupId> - <artifactId>spotbugs-maven-plugin</artifactId> - <version>${spotbugs-maven-plugin.version}</version> - <configuration> - <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile> - </configuration> - <executions> - <execution> - <id>spotbugs</id> - <phase>verify</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - <profiles> - <!-- - The only working way for OWASP dependency checker plugin - to exclude module when failBuildOnCVSS is used - in the root pom's plugin. - --> - <profile> - <id>owasp-dependency-check</id> - <build> - <plugins> - <plugin> - <groupId>org.owasp</groupId> - <artifactId>dependency-check-maven</artifactId> - <executions> - <execution> - <goals> - <goal>aggregate</goal> - </goals> - <phase>none</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> - -</project> \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java deleted file mode 100644 index 757360e0453..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import java.io.Serializable; -import lombok.Data; -import lombok.experimental.Accessors; -import org.apache.commons.lang.StringUtils; - -/** - * Configuration object for all HDFS components. - */ -@Data -@Accessors(chain = true) -public abstract class AbstractHdfsConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - /** - * A file or comma separated list of files which contains the Hadoop file system configuration, - * e.g. 'core-site.xml', 'hdfs-site.xml'. - */ - private String hdfsConfigResources; - - /** - * The HDFS directory from which files should be read from or written to. - */ - private String directory; - - /** - * The character encoding for the files, e.g. UTF-8, ASCII, etc. - */ - private String encoding; - - /** - * The compression codec used to compress/de-compress the files on HDFS. - */ - private Compression compression; - - /** - * The Kerberos user principal account to use for authentication. - */ - private String kerberosUserPrincipal; - - /** - * The full pathname to the Kerberos keytab file to use for authentication. - */ - private String keytab; - - public void validate() { - if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) { - throw new IllegalArgumentException("Required property not set."); - } - - if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && StringUtils.isEmpty(keytab)) - || (StringUtils.isEmpty(kerberosUserPrincipal) && StringUtils.isNotEmpty(keytab))) { - throw new IllegalArgumentException("Values for both kerberosUserPrincipal & keytab are required."); - } - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java deleted file mode 100644 index d7277aa6273..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import java.io.IOException; -import java.lang.ref.WeakReference; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.URI; -import java.nio.charset.Charset; -import java.security.PrivilegedExceptionAction; -import java.util.Collections; -import java.util.Map; -import java.util.WeakHashMap; -import java.util.concurrent.atomic.AtomicReference; -import javax.net.SocketFactory; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig; - -/** - * A Simple abstract class for HDFS connectors. - * Provides methods for connecting to HDFS - */ -public abstract class AbstractHdfsConnector { - - private static final Object RESOURCES_LOCK = new Object(); - - // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) - protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>(); - protected AbstractHdfsConfig connectorConfig; - protected CompressionCodecFactory compressionCodecFactory; - - public AbstractHdfsConnector() { - hdfsResources.set(new HdfsResources(null, null, null)); - } - - /* - * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. - */ - protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException { - Configuration config = new ExtendedConfiguration(); - config.setClassLoader(Thread.currentThread().getContextClassLoader()); - - getConfig(config, connectorConfig.getHdfsConfigResources()); - - // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout - checkHdfsUriForTimeout(config); - - /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure - * the processor without a complete restart - */ - String disableCacheName = String.format("fs.%s.impl.disable.cache", - FileSystem.getDefaultUri(config).getScheme()); - config.set(disableCacheName, "true"); - - // If kerberos is enabled, create the file system as the kerberos principal - // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time - FileSystem fs; - UserGroupInformation ugi; - synchronized (RESOURCES_LOCK) { - if (SecurityUtil.isSecurityEnabled(config)) { - ugi = SecurityUtil.loginKerberos(config, - connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab()); - fs = getFileSystemAsUser(config, ugi); - } else { - config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); - config.set("hadoop.security.authentication", "simple"); - ugi = SecurityUtil.loginSimple(config); - fs = getFileSystemAsUser(config, ugi); - } - } - return new HdfsResources(config, fs, ugi); - } - - private static Configuration getConfig(final Configuration config, String res) throws IOException { - boolean foundResources = false; - if (null != res) { - String[] resources = res.split(","); - for (String resource : resources) { - config.addResource(new Path(resource.trim())); - foundResources = true; - } - } - - if (!foundResources) { - // check that at least 1 non-default resource is available on the classpath - String configStr = config.toString(); - for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { - if (!resource.contains("default") && config.getResource(resource.trim()) != null) { - foundResources = true; - break; - } - } - } - - if (!foundResources) { - throw new IOException("Could not find any of the " + res + " on the classpath"); - } - return config; - } - - /* - * Reduce the timeout of a socket connection from the default in FileSystem.get() - */ - protected void checkHdfsUriForTimeout(Configuration config) throws IOException { - URI hdfsUri = FileSystem.getDefaultUri(config); - String address = hdfsUri.getAuthority(); - int port = hdfsUri.getPort(); - if (address == null || address.isEmpty() || port < 0) { - return; - } - InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); - SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); - try (Socket socket = socketFactory.createSocket()) { - NetUtils.connect(socket, namenode, 1000); // 1 second timeout - } - } - - /** - * This exists in order to allow unit tests to override it so that they don't take several - * minutes waiting for UDP packets to be received. - * - * @param config - * the configuration to use - * @return the FileSystem that is created for the given Configuration - * @throws IOException - * if unable to create the FileSystem - */ - protected FileSystem getFileSystem(final Configuration config) throws IOException { - return FileSystem.get(config); - } - - protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { - try { - return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> FileSystem.get(config)); - } catch (InterruptedException e) { - throw new IOException("Unable to create file system: " + e.getMessage()); - } - } - - protected Configuration getConfiguration() { - return hdfsResources.get().getConfiguration(); - } - - protected FileSystem getFileSystem() { - return hdfsResources.get().getFileSystem(); - } - - protected UserGroupInformation getUserGroupInformation() { - return hdfsResources.get().getUserGroupInformation(); - } - - protected String getEncoding() { - return StringUtils.isNotBlank(connectorConfig.getEncoding()) - ? connectorConfig.getEncoding() : Charset.defaultCharset().name(); - } - - protected CompressionCodec getCompressionCodec() { - if (connectorConfig.getCompression() == null) { - return null; - } - - CompressionCodec codec = getCompressionCodecFactory() - .getCodecByName(connectorConfig.getCompression().name()); - - return (codec != null) ? codec : new DefaultCodec(); - } - - protected CompressionCodecFactory getCompressionCodecFactory() { - if (compressionCodecFactory == null) { - compressionCodecFactory = new CompressionCodecFactory(getConfiguration()); - } - - return compressionCodecFactory; - } - - /** - * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be - * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load - * something that was previously not found, but might now be available. - * Reference the original getClassByNameOrNull from Configuration. - */ - static class ExtendedConfiguration extends Configuration { - - private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>(); - - @Override - public Class<?> getClassByNameOrNull(String name) { - final ClassLoader classLoader = getClassLoader(); - - Map<String, WeakReference<Class<?>>> map; - synchronized (cacheClasses) { - map = cacheClasses.get(classLoader); - if (map == null) { - map = Collections.synchronizedMap(new WeakHashMap<>()); - cacheClasses.put(classLoader, map); - } - } - - Class<?> clazz = null; - WeakReference<Class<?>> ref = map.get(name); - if (ref != null) { - clazz = ref.get(); - } - - if (clazz == null) { - try { - clazz = Class.forName(name, true, classLoader); - } catch (ClassNotFoundException | NoClassDefFoundError e) { - return null; - } - // two putters can race here, but they'll put the same class - map.put(name, new WeakReference<>(clazz)); - return clazz; - } else { - // cache hit - return clazz; - } - } - - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java deleted file mode 100644 index 1e3d2f94904..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -/** - * An enumeration of compression codecs available for HDFS. - */ -public enum Compression { - BZIP2, DEFLATE, GZIP, LZ4, SNAPPY, ZSTANDARD -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java deleted file mode 100644 index 5fd6b283e6b..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * A wrapper class for HDFS resources. - */ -public class HdfsResources { - - private final Configuration configuration; - private final FileSystem fileSystem; - private final UserGroupInformation userGroupInformation; - - public HdfsResources(Configuration config, FileSystem fs, UserGroupInformation ugi) { - this.configuration = config; - this.fileSystem = fs; - this.userGroupInformation = ugi; - } - - public Configuration getConfiguration() { - return configuration; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public UserGroupInformation getUserGroupInformation() { - return userGroupInformation; - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java deleted file mode 100644 index ca178aad911..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; - -import java.io.IOException; -import org.apache.commons.lang3.Validate; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from - * interfering with each other. - */ -public class SecurityUtil { - public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; - public static final String KERBEROS = "kerberos"; - - /** - * Initializes UserGroupInformation with the given Configuration and performs the login for the - * given principal and keytab. All logins should happen through this class to ensure other threads - * are not concurrently modifying UserGroupInformation. - * <p/> - * @param config the configuration instance - * @param principal the principal to authenticate as - * @param keyTab the keytab to authenticate with - * - * @return the UGI for the given principal - * - * @throws IOException if login failed - */ - public static synchronized UserGroupInformation loginKerberos(final Configuration config, - final String principal, final String keyTab) throws IOException { - Validate.notNull(config); - Validate.notNull(principal); - Validate.notNull(keyTab); - - UserGroupInformation.setConfiguration(config); - UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim()); - return UserGroupInformation.getCurrentUser(); - } - - /** - * Initializes UserGroupInformation with the given Configuration and - * returns UserGroupInformation.getLoginUser(). All logins should happen - * through this class to ensure other threads are not concurrently - * modifying UserGroupInformation. - * - * @param config the configuration instance - * - * @return the UGI for the given principal - * - * @throws IOException if login failed - */ - public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException { - Validate.notNull(config); - UserGroupInformation.setConfiguration(config); - return UserGroupInformation.getLoginUser(); - } - - /** - * Initializes UserGroupInformation with the given Configuration and returns - * UserGroupInformation.isSecurityEnabled(). - * All checks for isSecurityEnabled() should happen through this method. - * - * @param config the given configuration - * - * @return true if kerberos is enabled on the given configuration, false otherwise - * - */ - public static boolean isSecurityEnabled(final Configuration config) { - Validate.notNull(config); - return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION)); - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java deleted file mode 100644 index 464c6db341e..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2; \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java deleted file mode 100644 index 7b025d16378..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import java.io.IOException; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector; -import org.apache.pulsar.io.hdfs2.HdfsResources; - -/** - * A Simple abstract class for HDFS sink. - * Users need to implement extractKeyValue function to use this sink. - */ -@Slf4j -public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> { - - protected HdfsSinkConfig hdfsSinkConfig; - protected BlockingQueue<Record<V>> unackedRecords; - protected HdfsSyncThread<V> syncThread; - private Path path; - private FSDataOutputStream hdfsStream; - private DateTimeFormatter subdirectoryFormatter; - - public abstract KeyValue<K, V> extractKeyValue(Record<V> record); - protected abstract void createWriter() throws IOException; - - @Override - public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { - hdfsSinkConfig = HdfsSinkConfig.load(config); - hdfsSinkConfig.validate(); - connectorConfig = hdfsSinkConfig; - unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords()); - if (hdfsSinkConfig.getSubdirectoryPattern() != null) { - subdirectoryFormatter = DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern()); - } - connectToHdfs(); - createWriter(); - launchSyncThread(); - } - - @Override - public void close() throws Exception { - syncThread.halt(); - syncThread.join(0); - } - - protected final void connectToHdfs() throws IOException { - try { - HdfsResources resources = hdfsResources.get(); - - if (resources.getConfiguration() == null) { - resources = this.resetHDFSResources(hdfsSinkConfig); - hdfsResources.set(resources); - } - } catch (IOException ex) { - hdfsResources.set(new HdfsResources(null, null, null)); - throw ex; - } - } - - protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException { - if (hdfsStream == null) { - Path path = getPath(); - FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation()); - hdfsStream = fs.exists(path) ? fs.append(path) : fs.create(path); - } - return hdfsStream; - } - - protected final Path getPath() { - if (path == null) { - String ext = ""; - if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) { - ext = hdfsSinkConfig.getFileExtension(); - } else if (getCompressionCodec() != null) { - ext = getCompressionCodec().getDefaultExtension(); - } - - String directory = hdfsSinkConfig.getDirectory(); - if (subdirectoryFormatter != null) { - directory = FilenameUtils.concat(directory, LocalDateTime.now().format(subdirectoryFormatter)); - } - path = new Path(FilenameUtils.concat(directory, - hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext)); - log.info("Create path: {}", path); - } - return path; - } - - protected final void launchSyncThread() throws IOException { - syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, hdfsSinkConfig.getSyncInterval()); - syncThread.start(); - } -} \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java deleted file mode 100644 index 9e1c6090fb5..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.experimental.Accessors; -import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.io.hdfs2.AbstractHdfsConfig; - -/** - * Configuration object for all HDFS Sink components. - */ -@Data -@EqualsAndHashCode(callSuper = false) -@Accessors(chain = true) -public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - /** - * The prefix of the files to create inside the HDFS directory, i.e. a value of "topicA" - * will result in files named topicA-, topicA-, etc being produced - */ - private String filenamePrefix; - - /** - * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', etc. - */ - private String fileExtension; - - /** - * The character to use to separate records in a text file. If no value is provided - * then the content from all of the records will be concatenated together in one continuous - * byte array. - */ - private char separator; - - /** - * The interval (in milliseconds) between calls to flush data to HDFS disk. - */ - private long syncInterval; - - /** - * The maximum number of records that we hold in memory before acking. Default is Integer.MAX_VALUE. - * Setting this value to one, results in every record being sent to disk before the record is acked, - * while setting it to a higher values allows us to buffer records before flushing them all to disk. - */ - private int maxPendingRecords = Integer.MAX_VALUE; - - /** - * A subdirectory associated with the created time of the sink. - * The pattern is the formatted pattern of {@link AbstractHdfsConfig#getDirectory()}'s subdirectory. - * - * @see java.time.format.DateTimeFormatter for pattern's syntax - */ - private String subdirectoryPattern; - - public static HdfsSinkConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class); - } - - public static HdfsSinkConfig load(Map<String, Object> map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), HdfsSinkConfig.class); - } - - @Override - public void validate() { - super.validate(); - if ((StringUtils.isEmpty(fileExtension) && getCompression() == null) - || StringUtils.isEmpty(filenamePrefix)) { - throw new IllegalArgumentException("Required property not set."); - } - - if (syncInterval < 0) { - throw new IllegalArgumentException("Sync Interval cannot be negative"); - } - - if (maxPendingRecords < 1) { - throw new IllegalArgumentException("Max Pending Records must be a positive integer"); - } - - if (subdirectoryPattern != null) { - try { - LocalDateTime.of(2020, 1, 1, 12, 0).format(DateTimeFormatter.ofPattern(subdirectoryPattern)); - } catch (Exception e) { - throw new IllegalArgumentException(subdirectoryPattern + " is not a valid pattern: " + e.getMessage()); - } - } - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java deleted file mode 100644 index 9ddd83f4423..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import java.io.IOException; -import java.util.concurrent.BlockingQueue; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.hadoop.fs.Syncable; -import org.apache.pulsar.functions.api.Record; - -/** - * A thread that runs in the background and acknowledges Records - * after they have been written to disk. - * - * @param <V> - */ -public class HdfsSyncThread<V> extends Thread { - - private final Syncable stream; - private final BlockingQueue<Record<V>> unackedRecords; - private final long syncInterval; - private boolean keepRunning = true; - - public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>> unackedRecords, long syncInterval) { - this.stream = stream; - this.unackedRecords = unackedRecords; - this.syncInterval = syncInterval; - } - - @Override - public void run() { - while (keepRunning) { - try { - Thread.sleep(syncInterval); - ackRecords(); - } catch (InterruptedException e) { - return; - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public final void halt() throws IOException, InterruptedException { - keepRunning = false; - ackRecords(); - } - - private void ackRecords() throws IOException, InterruptedException { - - if (CollectionUtils.isEmpty(unackedRecords)) { - return; - } - - synchronized (stream) { - stream.hsync(); - } - - while (!unackedRecords.isEmpty()) { - unackedRecords.take().ack(); - } - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java deleted file mode 100644 index 238a441ee0e..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java deleted file mode 100644 index 355c00080ef..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.SequenceFile.Writer.Option; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * HDFS Sink that writes it contents to HDFS as Sequence Files. - * - * @param <K> - The incoming Key type - * @param <V> - The incoming Value type - * @param <HdfsK> - The HDFS Key type - * @param <HdfsV> - The HDFS Value type - */ -public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV> - extends HdfsAbstractSink<K, V> implements Sink<V> { - - private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class); - - protected AtomicLong counter; - protected FSDataOutputStream hdfsStream; - protected Writer writer = null; - - public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv); - - @Override - public void close() throws Exception { - writer.close(); - super.close(); - } - - @Override - protected void createWriter() throws IOException { - writer = getWriter(); - } - - @Override - public void write(Record<V> record) { - try { - KeyValue<K, V> kv = extractKeyValue(record); - KeyValue<HdfsK, HdfsV> keyValue = convert(kv); - writer.append(keyValue.getKey(), keyValue.getValue()); - unackedRecords.put(record); - } catch (IOException | InterruptedException e) { - LOG.error("Unable to write to file " + getPath(), e); - record.fail(); - } - } - - protected Writer getWriter() throws IOException { - counter = new AtomicLong(0); - List<Option> options = getOptions(); - return SequenceFile.createWriter(getConfiguration(), - options.toArray(new Option[options.size()])); - } - - protected List<Option> getOptions() throws IllegalArgumentException, IOException { - List<Option> list = new ArrayList<Option>(); - list.add(Writer.stream(getHdfsStream())); - - if (getCompressionCodec() != null) { - list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec())); - } - return list; - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java deleted file mode 100644 index 8fa908c3c8c..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.SequenceFile.Writer.Option; -import org.apache.hadoop.io.Text; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; - -/** - * This Sink should be used when the records are originating from a sequential source, - * and we want to retain the record sequence.This class uses the record's sequence id as - * the sequence id in the HDFS Sequence File if it is available, if not a sequence id is - * auto-generated for each new record. - */ -public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, String, LongWritable, Text> { - - private AtomicLong counter; - - @Override - public Writer getWriter() throws IOException { - counter = new AtomicLong(0); - - return SequenceFile - .createWriter( - getConfiguration(), - getOptions().toArray(new Option[getOptions().size()])); - } - - @Override - protected List<Option> getOptions() throws IllegalArgumentException, IOException { - List<Option> opts = super.getOptions(); - opts.add(Writer.keyClass(LongWritable.class)); - opts.add(Writer.valueClass(Text.class)); - return opts; - } - - @Override - public KeyValue<Long, String> extractKeyValue(Record<String> record) { - Long sequence = record.getRecordSequence().orElseGet(() -> counter.incrementAndGet()); - return new KeyValue<>(sequence, record.getValue()); - } - - @Override - public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) { - return new KeyValue<>(new LongWritable(kv.getKey()), new Text(kv.getValue())); - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java deleted file mode 100644 index 3c643e287a9..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; - -import java.io.IOException; -import java.util.List; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.SequenceFile.Writer.Option; -import org.apache.hadoop.io.Text; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; - -/** - * A Simple Sink class for Hdfs Sequence File. - */ -public class HdfsTextSink extends - HdfsAbstractSequenceFileSink<String, String, Text, Text> { - - @Override - protected List<Option> getOptions() throws IllegalArgumentException, IOException { - List<Option> opts = super.getOptions(); - opts.add(Writer.keyClass(Text.class)); - opts.add(Writer.valueClass(Text.class)); - return opts; - } - - @Override - public KeyValue<String, String> extractKeyValue(Record<String> record) { - String key = record.getKey().orElseGet(() -> record.getValue()); - return new KeyValue<>(key, record.getValue()); - } - - @Override - public KeyValue<Text, Text> convert(KeyValue<String, String> kv) { - return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue())); - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java deleted file mode 100644 index 0bcb0d837fc..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java deleted file mode 100644 index d96309695f1..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.text; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for HDFS Sinks that writes there contents to HDFS as Text Files. - * - * @param <K> - * @param <V> - */ -public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> { - - private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractTextFileSink.class); - - protected OutputStreamWriter writer; - - @Override - protected void createWriter() throws IOException { - writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding()); - } - - @Override - public void close() throws Exception { - writer.close(); - super.close(); - } - - @Override - public void write(Record<V> record) { - try { - KeyValue<K, V> kv = extractKeyValue(record); - writer.write(kv.getValue().toString()); - - if (hdfsSinkConfig.getSeparator() != '\u0000') { - writer.write(hdfsSinkConfig.getSeparator()); - } - unackedRecords.put(record); - } catch (IOException | InterruptedException e) { - LOG.error("Unable to write to file " + getPath(), e); - record.fail(); - } - } - - private OutputStream openHdfsStream() throws IOException { - if (hdfsSinkConfig.getCompression() != null) { - return getCompressionCodec().createOutputStream(getHdfsStream()); - } else { - return getHdfsStream(); - } - } -} \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java deleted file mode 100644 index 4de5de1aedf..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.text; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; - -/** - * A Simple Sink class for Hdfs Text File. - */ -public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String> implements Sink<String> { - @Override - public KeyValue<String, String> extractKeyValue(Record<String> record) { - String key = record.getKey().orElseGet(() -> record.getValue()); - return new KeyValue<>(key, record.getValue()); - } -} diff --git a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java deleted file mode 100644 index a5618597197..00000000000 --- a/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.text; \ No newline at end of file diff --git a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml deleted file mode 100644 index 8bc33954c29..00000000000 --- a/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -name: hdfs2 -description: Writes data into HDFS 2.x -sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink -sinkConfigClass: org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig diff --git a/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml b/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml deleted file mode 100644 index 980349c4d8d..00000000000 --- a/pulsar-io/hdfs2/src/main/resources/findbugsExclude.xml +++ /dev/null @@ -1,58 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<FindBugsFilter> - <!-- Ignore violations that were present when the rule was enabled --> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/> - <Method name="getConfiguration"/> - <Bug pattern="EI_EXPOSE_REP"/> - </Match> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/> - <Method name="getFileSystem"/> - <Bug pattern="EI_EXPOSE_REP"/> - </Match> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/> - <Method name="getUserGroupInformation"/> - <Bug pattern="EI_EXPOSE_REP"/> - </Match> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/> - <Method name="<init>"/> - <Bug pattern="EI_EXPOSE_REP2"/> - </Match> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/> - <Method name="<init>"/> - <Bug pattern="EI_EXPOSE_REP2"/> - </Match> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.HdfsResources"/> - <Method name="<init>"/> - <Bug pattern="EI_EXPOSE_REP2"/> - </Match> - <Match> - <Class name="org.apache.pulsar.io.hdfs2.sink.HdfsSyncThread"/> - <Method name="<init>"/> - <Bug pattern="EI_EXPOSE_REP2"/> - </Match> -</FindBugsFilter> diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java deleted file mode 100644 index 2660199dc18..00000000000 --- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.testng.annotations.BeforeMethod; - -/** - * Simple base class for all the HDFS sink test cases. - * Provides utility methods for sending records to the sink. - * - */ -public abstract class AbstractHdfsSinkTest<K, V> { - - @Mock - protected SinkContext mockSinkContext; - - @Mock - protected Record<V> mockRecord; - - protected Map<String, Object> map; - protected HdfsAbstractSink<K, V> sink; - - @SuppressWarnings("unchecked") - @BeforeMethod(alwaysRun = true) - public final void setUp() throws Exception { - map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml," - + "../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml"); - map.put("directory", "/tmp/testing"); - map.put("filenamePrefix", "prefix"); - - mockSinkContext = mock(SinkContext.class); - - mockRecord = mock(Record.class); - when(mockRecord.getRecordSequence()).thenAnswer(new Answer<Optional<Long>>() { - long sequenceCounter = 0; - public Optional<Long> answer(InvocationOnMock invocation) throws Throwable { - return Optional.of(sequenceCounter++); - }}); - - when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() { - long sequenceCounter = 0; - public Optional<String> answer(InvocationOnMock invocation) throws Throwable { - return Optional.of( "key-" + sequenceCounter++); - }}); - - when(mockRecord.getValue()).thenAnswer(new Answer<String>() { - long sequenceCounter = 0; - public String answer(InvocationOnMock invocation) throws Throwable { - return new String( "value-" + sequenceCounter++ + "-" + UUID.randomUUID()); - }}); - - createSink(); - } - - protected abstract void createSink(); - - protected final void send(int numRecords) throws Exception { - for (int idx = 0; idx < numRecords; idx++) { - sink.write(mockRecord); - } - } - - protected final void runFor(int numSeconds) throws InterruptedException { - Producer producer = new Producer(); - producer.start(); - Thread.sleep(numSeconds * 1000); // Run for N seconds - producer.halt(); - producer.join(2000); - } - - protected final class Producer extends Thread { - public boolean keepRunning = true; - @Override - public void run() { - while (keepRunning) - try { - sink.write(mockRecord); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public void halt() { - keepRunning = false; - } - - } -} diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java deleted file mode 100644 index 1d4b379011e..00000000000 --- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.pulsar.io.hdfs2.Compression; -import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig; -import org.testng.annotations.Test; - -import com.fasterxml.jackson.databind.exc.InvalidFormatException; - - -public class HdfsSinkConfigTests { - - @Test - public final void loadFromYamlFileTest() throws IOException { - File yamlFile = getFile("sinkConfig.yaml"); - HdfsSinkConfig config = HdfsSinkConfig.load(yamlFile.getAbsolutePath()); - assertNotNull(config); - assertEquals("core-site.xml", config.getHdfsConfigResources()); - assertEquals("/foo/bar", config.getDirectory()); - assertEquals("prefix", config.getFilenamePrefix()); - assertEquals(Compression.SNAPPY, config.getCompression()); - assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern()); - } - - @Test - public final void loadFromMapTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - map.put("directory", "/foo/bar"); - map.put("filenamePrefix", "prefix"); - map.put("compression", "SNAPPY"); - map.put("subdirectoryPattern", "yy-MM-dd"); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - assertNotNull(config); - assertEquals("core-site.xml", config.getHdfsConfigResources()); - assertEquals("/foo/bar", config.getDirectory()); - assertEquals("prefix", config.getFilenamePrefix()); - assertEquals(Compression.SNAPPY, config.getCompression()); - assertEquals("yy-MM-dd", config.getSubdirectoryPattern()); - } - - @Test - public final void validValidateTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - map.put("directory", "/foo/bar"); - map.put("filenamePrefix", "prefix"); - map.put("fileExtension", ".txt"); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Required property not set.") - public final void missingDirectoryValidateTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Required property not set.") - public final void missingHdfsConfigsValidateTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("directory", "/foo/bar"); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = InvalidFormatException.class) - public final void invalidCodecValidateTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - map.put("directory", "/foo/bar"); - map.put("filenamePrefix", "prefix"); - map.put("fileExtension", ".txt"); - map.put("compression", "bad value"); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Sync Interval cannot be negative") - public final void invalidSyncIntervalTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - map.put("directory", "/foo/bar"); - map.put("filenamePrefix", "prefix"); - map.put("fileExtension", ".txt"); - map.put("syncInterval", -1); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Max Pending Records must be a positive integer") - public final void invalidMaxPendingRecordsTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - map.put("directory", "/foo/bar"); - map.put("filenamePrefix", "prefix"); - map.put("fileExtension", ".txt"); - map.put("maxPendingRecords", 0); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Values for both kerberosUserPrincipal & keytab are required.") - public final void kerberosValidateTest() throws IOException { - Map<String, Object> map = new HashMap<String, Object> (); - map.put("hdfsConfigResources", "core-site.xml"); - map.put("directory", "/foo/bar"); - map.put("filenamePrefix", "prefix"); - map.put("keytab", "/etc/keytab/hdfs.client.ktab"); - - HdfsSinkConfig config = HdfsSinkConfig.load(map); - config.validate(); - } - - private File getFile(String name) { - ClassLoader classLoader = getClass().getClassLoader(); - return new File(classLoader.getResource(name).getFile()); - } -} diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java deleted file mode 100644 index 66cfc5b547c..00000000000 --- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertNotNull; -import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest; -import org.testng.SkipException; -import org.testng.annotations.Test; - -public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, String> { - - @Override - protected void createSink() { - sink = new HdfsSequentialTextSink(); - } - - @Test - public final void write100Test() throws Exception { - map.put("filenamePrefix", "write100Test-seq"); - map.put("fileExtension", ".seq"); - map.put("syncInterval", 1000); - sink.open(map, mockSinkContext); - - assertNotNull(sink); - send(100); - - Thread.sleep(2000); - verify(mockRecord, times(100)).ack(); - sink.close(); - } - - @Test - public final void write5000Test() throws Exception { - map.put("filenamePrefix", "write5000Test-seq"); - map.put("fileExtension", ".seq"); - map.put("syncInterval", 1000); - sink.open(map, mockSinkContext); - - assertNotNull(sink); - send(5000); - - Thread.sleep(2000); - verify(mockRecord, times(5000)).ack(); - sink.close(); - } - - @Test - public final void tenSecondTest() throws Exception { - map.put("filenamePrefix", "tenSecondTest-seq"); - map.put("fileExtension", ".seq"); - map.put("syncInterval", 1000); - sink.open(map, mockSinkContext); - runFor(10); - sink.close(); - } - - @Test - public final void bzip2CompressionTest() throws Exception { - map.put("filenamePrefix", "bzip2CompressionTest-seq"); - map.put("compression", "BZIP2"); - map.remove("fileExtension"); - sink.open(map, mockSinkContext); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } - - @Test - public final void deflateCompressionTest() throws Exception { - map.put("filenamePrefix", "deflateCompressionTest-seq"); - map.put("compression", "DEFLATE"); - map.remove("fileExtension"); - sink.open(map, mockSinkContext); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } - - @Test - public final void zStandardCompressionTest() throws Exception { - if (System.getenv("LD_LIBRARY_PATH") == null) { - throw new SkipException("Skip zStandardCompressionTest since LD_LIBRARY_PATH is not set"); - } - map.put("filenamePrefix", "zStandardCompressionTest-seq"); - map.put("compression", "ZSTANDARD"); - map.remove("fileExtension"); - sink.open(map, mockSinkContext); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } -} diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java deleted file mode 100644 index a64345c4e11..00000000000 --- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.seq; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.testng.Assert.assertNotNull; -import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest; -import org.testng.SkipException; -import org.testng.annotations.Test; - -public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> { - - @Override - protected void createSink() { - sink = new HdfsTextSink(); - } - - @Test - public final void write100Test() throws Exception { - map.put("filenamePrefix", "write100TestText-seq"); - map.put("fileExtension", ".seq"); - map.put("syncInterval", 1000); - sink.open(map, mockSinkContext); - - assertNotNull(sink); - assertNotNull(mockRecord); - send(100); - - Thread.sleep(2000); - verify(mockRecord, times(100)).ack(); - sink.close(); - } - - @Test - public final void write5000Test() throws Exception { - map.put("filenamePrefix", "write5000TestText-seq"); - map.put("fileExtension", ".seq"); - map.put("syncInterval", 1000); - sink.open(map, mockSinkContext); - - assertNotNull(sink); - assertNotNull(mockRecord); - send(5000); - - Thread.sleep(2000); - verify(mockRecord, times(5000)).ack(); - sink.close(); - } - - @Test - public final void tenSecondTest() throws Exception { - map.put("filenamePrefix", "tenSecondTestText-seq"); - map.put("fileExtension", ".seq"); - map.put("syncInterval", 1000); - sink.open(map, mockSinkContext); - - assertNotNull(mockRecord); - - runFor(10); - sink.close(); - } - - @Test - public final void bzip2CompressionTest() throws Exception { - map.put("filenamePrefix", "bzip2CompressionTestText-seq"); - map.put("compression", "BZIP2"); - map.remove("fileExtension"); - sink.open(map, mockSinkContext); - - assertNotNull(mockRecord); - - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } - - @Test - public final void deflateCompressionTest() throws Exception { - map.put("filenamePrefix", "deflateCompressionTestText-seq"); - map.put("compression", "DEFLATE"); - map.remove("fileExtension"); - sink.open(map, mockSinkContext); - - assertNotNull(mockRecord); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } - - @Test - public final void zStandardCompressionTest() throws Exception { - if (System.getenv("LD_LIBRARY_PATH") == null) { - throw new SkipException("Skip zStandardCompressionTest since LD_LIBRARY_PATH is not set"); - } - map.put("filenamePrefix", "zStandardCompressionTestText-seq"); - map.put("compression", "ZSTANDARD"); - map.remove("fileExtension"); - sink.open(map, mockSinkContext); - - assertNotNull(mockRecord); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } -} diff --git a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java deleted file mode 100644 index 7bc5c15ee6a..00000000000 --- a/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.hdfs2.sink.text; - -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest; -import org.testng.SkipException; -import org.testng.annotations.Test; - -public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> { - - @Override - protected void createSink() { - sink = new HdfsStringSink(); - } - - @Test - public final void write5000Test() throws Exception { - map.put("filenamePrefix", "write5000Test"); - map.put("fileExtension", ".txt"); - map.put("separator", '\n'); - sink.open(map, mockSinkContext); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } - - @Test - public final void fiveByTwoThousandTest() throws Exception { - map.put("filenamePrefix", "fiveByTwoThousandTest"); - map.put("fileExtension", ".txt"); - map.put("separator", '\n'); - sink.open(map, mockSinkContext); - - for (int idx = 1; idx < 6; idx++) { - send(2000); - } - sink.close(); - verify(mockRecord, times(2000 * 5)).ack(); - } - - @Test - public final void tenSecondTest() throws Exception { - map.put("filenamePrefix", "tenSecondTest"); - map.put("fileExtension", ".txt"); - map.put("separator", '\n'); - sink.open(map, mockSinkContext); - runFor(10); - sink.close(); - } - - @Test - public final void maxPendingRecordsTest() throws Exception { - map.put("filenamePrefix", "maxPendingRecordsTest"); - map.put("fileExtension", ".txt"); - map.put("separator", '\n'); - map.put("maxPendingRecords", 500); - sink.open(map, mockSinkContext); - runFor(10); - sink.close(); - } - - @Test - public final void bzip2CompressionTest() throws Exception { - map.put("filenamePrefix", "bzip2CompressionTest"); - map.put("compression", "BZIP2"); - map.remove("fileExtension"); - map.put("separator", '\n'); - sink.open(map, mockSinkContext); - send(5000); - sink.close(); - verify(mockRecord, times(5000)).ack(); - } - - @Test - public final void deflateCompressionTest() throws Exception { - map.put("filenamePrefix", "deflateCompressionTest"); - map.put("compression", "DEFLATE"); - map.put("fileExtension", ".deflate"); - map.put("separator", '\n'); - sink.open(map, mockSinkContext); - send(50000); - sink.close(); - verify(mockRecord, times(50000)).ack(); - } - - @Test - public final void zStandardCompressionTest() throws Exception { - if (System.getenv("LD_LIBRARY_PATH") == null) { - throw new SkipException("Skip zStandardCompressionTest since LD_LIBRARY_PATH is not set"); - } - map.put("filenamePrefix", "zStandardCompressionTest"); - map.put("compression", "ZSTANDARD"); - map.put("fileExtension", ".zstandard"); - map.put("separator", '\n'); - sink.open(map, mockSinkContext); - send(50000); - sink.close(); - verify(mockRecord, times(50000)).ack(); - } -} diff --git a/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml b/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml deleted file mode 100644 index 31d1e98c475..00000000000 --- a/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml +++ /dev/null @@ -1,32 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>fs.defaultFS</name> - <value>hdfs://0.0.0.0:8020</value> - </property> - <property> - <name>io.compression.codecs</name> - <value>org.apache.hadoop.io.compress.GzipCodec, - org.apache.hadoop.io.compress.DefaultCodec, - org.apache.hadoop.io.compress.SnappyCodec</value> - </property> -</configuration> diff --git a/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml b/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml deleted file mode 100644 index bb722f1f634..00000000000 --- a/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml +++ /dev/null @@ -1,34 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dfs.replication</name> - <value>1</value> - </property> - <property> - <name>dfs.client.use.datanode.hostname</name> - <value>true</value> - </property> - <property> - <name>dfs.support.append</name> - <value>true</value> - </property> -</configuration> diff --git a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml deleted file mode 100644 index 47ab4f9737a..00000000000 --- a/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml +++ /dev/null @@ -1,26 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -{ -"hdfsConfigResources": "core-site.xml", -"directory": "/foo/bar", -"filenamePrefix": "prefix", -"compression": "SNAPPY", -"subdirectoryPattern": "yyyy-MM-dd" -} \ No newline at end of file diff --git a/pulsar-io/hdfs3/pom.xml b/pulsar-io/hdfs3/pom.xml index d5ab99a5185..3f9fb1743af 100644 --- a/pulsar-io/hdfs3/pom.xml +++ b/pulsar-io/hdfs3/pom.xml @@ -18,7 +18,8 @@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.pulsar</groupId> @@ -29,7 +30,7 @@ <name>Pulsar IO :: Hdfs3</name> <dependencies> - <dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-io-core</artifactId> <version>${project.version}</version> @@ -50,29 +51,28 @@ <artifactId>commons-collections4</artifactId> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop3.version}</version> - <exclusions> - <exclusion> - <groupId>jakarta.activation</groupId> - <artifactId>jakarta.activation-api</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - </exclusions> - </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> + <groupId>jakarta.activation</groupId> + <artifactId>jakarta.activation-api</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>jakarta.activation</groupId> diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index a81dbe787c2..ed363046c64 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -63,7 +63,6 @@ <module>kafka-connect-adaptor</module> <module>kafka-connect-adaptor-nar</module> <module>debezium</module> - <module>hdfs2</module> <module>canal</module> <module>file</module> <module>netty</module> @@ -97,7 +96,6 @@ <module>hdfs3</module> <module>jdbc</module> <module>data-generator</module> - <module>hdfs2</module> <module>canal</module> <module>file</module> <module>netty</module> diff --git a/tiered-storage/file-system/pom.xml b/tiered-storage/file-system/pom.xml index 76c417becc2..0fbe47dec2a 100644 --- a/tiered-storage/file-system/pom.xml +++ b/tiered-storage/file-system/pom.xml @@ -50,6 +50,10 @@ <groupId>org.slf4j</groupId> <artifactId>*</artifactId> </exclusion> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> </exclusions> </dependency> @@ -78,6 +82,10 @@ <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> </exclusions> </dependency> <!-- fix hadoop-commons vulnerable dependencies --> @@ -122,6 +130,10 @@ <groupId>org.slf4j</groupId> <artifactId>*</artifactId> </exclusion> + <exclusion> + <groupId>dnsjava</groupId> + <artifactId>dnsjava</artifactId> + </exclusion> </exclusions> </dependency>
