This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 261eab1 Added HDFS Sink (#2409) 261eab1 is described below commit 261eab12789c8c06b64cb7a5688ab0c867d609b1 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Thu Sep 6 11:06:33 2018 -0700 Added HDFS Sink (#2409) * Added HDFS Sink * Fixed issues identified during PR review * Fixed comment * Added HDFS Container to externalServices * Ignoring HdfsSink test for now * Removed HDFS Container to externalServices * Fixed ASL licensing * Fixed compile errors * Added HDFS to SinkType Enum --- pom.xml | 50 +++- pulsar-io/hdfs/pom.xml | 69 ++++++ .../apache/pulsar/io/hdfs/AbstractHdfsConfig.java | 86 +++++++ .../pulsar/io/hdfs/AbstractHdfsConnector.java | 258 +++++++++++++++++++++ .../org/apache/pulsar/io/hdfs/Compression.java | 26 +++ .../org/apache/pulsar/io/hdfs/HdfsResources.java | 51 ++++ .../org/apache/pulsar/io/hdfs/SecurityUtil.java | 91 ++++++++ .../org/apache/pulsar/io/hdfs/package-info.java | 19 ++ .../pulsar/io/hdfs/sink/HdfsAbstractSink.java | 121 ++++++++++ .../apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java | 108 +++++++++ .../apache/pulsar/io/hdfs/sink/HdfsSyncThread.java | 80 +++++++ .../apache/pulsar/io/hdfs/sink/package-info.java | 19 ++ .../sink/seq/HdfsAbstractSequenceFileSink.java | 96 ++++++++ .../io/hdfs/sink/seq/HdfsSequentialTextSink.java | 71 ++++++ .../pulsar/io/hdfs/sink/seq/HdfsTextSink.java | 54 +++++ .../pulsar/io/hdfs/sink/seq/package-info.java | 19 ++ .../hdfs/sink/text/HdfsAbstractTextFileSink.java | 79 +++++++ .../pulsar/io/hdfs/sink/text/HdfsStringSink.java | 34 +++ .../pulsar/io/hdfs/sink/text/package-info.java | 19 ++ .../resources/META-INF/services/pulsar-io.yaml | 22 ++ .../pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java | 119 ++++++++++ .../pulsar/io/hdfs/sink/HdfsSinkConfigTests.java | 154 ++++++++++++ .../io/hdfs/sink/seq/HdfsSequentialSinkTests.java | 94 ++++++++ .../pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java | 104 +++++++++ .../io/hdfs/sink/text/HdfsStringSinkTests.java | 103 ++++++++ .../src/test/resources/hadoop/core-site.xml} | 38 +-- .../src/test/resources/hadoop/hdfs-site.xml} | 40 ++-- pulsar-io/hdfs/src/test/resources/sinkConfig.yaml | 25 ++ pulsar-io/pom.xml | 1 + .../integration/containers/HdfsContainer.java | 51 ++++ .../integration/functions/PulsarFunctionsTest.java | 7 +- .../tests/integration/io/HdfsSinkTester.java | 65 ++++++ .../pulsar/tests/integration/io/SinkTester.java | 3 +- .../tests/integration/suites/PulsarTestSuite.java | 1 + 34 files changed, 2117 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index 2352d5e..b349925 100644 --- a/pom.xml +++ b/pom.xml @@ -496,12 +496,6 @@ flexible messaging model and an intuitive client API.</description> </dependency> <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>2.5</version> - </dependency> - - <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> @@ -1350,6 +1344,50 @@ flexible messaging model and an intuitive client API.</description> <profile> <id>docker</id> </profile> + + <profile> + <!-- Checks style and licensing requirements. This is a good + idea to run for contributions and for the release process. While it would + be nice to run always these plugins can considerably slow the build and have + proven to create unstable builds in our multi-module project and when building + using multiple threads. The stability issues seen with Checkstyle in multi-module + builds include false-positives and false negatives. --> + <id>contrib-check</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + <phase>verify</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <executions> + <execution> + <id>check-style</id> + <phase>verify</phase> + <configuration> + <configLocation>./buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation> + <suppressionsLocation>/buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation> + <encoding>UTF-8</encoding> + </configuration> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> </profiles> <repositories> diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs/pom.xml new file mode 100644 index 0000000..0d55207 --- /dev/null +++ b/pulsar-io/hdfs/pom.xml @@ -0,0 +1,69 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + </parent> + <artifactId>pulsar-io-hdfs</artifactId> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>3.1.1</version> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java new file mode 100644 index 0000000..529c350 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.Serializable; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +import org.apache.commons.lang.StringUtils; + +/** + * Configuration object for all HDFS components. + */ +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public abstract class AbstractHdfsConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * A file or comma separated list of files which contains the Hadoop file system configuration, + * e.g. 'core-site.xml', 'hdfs-site.xml'. + */ + private String hdfsConfigResources; + + /** + * The HDFS directory from which files should be read from or written to. + */ + private String directory; + + /** + * The character encoding for the files, e.g. UTF-8, ASCII, etc. + */ + private String encoding; + + /** + * The compression codec used to compress/de-compress the files on HDFS. + */ + private Compression compression; + + /** + * The Kerberos user principal account to use for authentication. + */ + private String kerberosUserPrincipal; + + /** + * The full pathname to the Kerberos keytab file to use for authentication. + */ + private String keytab; + + public void validate() { + if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) { + throw new IllegalArgumentException("Required property not set."); + } + + if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && StringUtils.isEmpty(keytab)) + || (StringUtils.isEmpty(kerberosUserPrincipal) && StringUtils.isNotEmpty(keytab))) { + throw new IllegalArgumentException("Values for both kerberosUserPrincipal & keytab are required."); + } + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java new file mode 100644 index 0000000..0eccd93 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.SocketFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig; + +/** + * A Simple abstract class for HDFS connectors. + * Provides methods for connecting to HDFS + */ +public abstract class AbstractHdfsConnector { + + private static final Object RESOURCES_LOCK = new Object(); + + // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) + protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>(); + protected AbstractHdfsConfig connectorConfig; + protected CompressionCodecFactory compressionCodecFactory; + + public AbstractHdfsConnector() { + hdfsResources.set(new HdfsResources(null, null, null)); + } + + /* + * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. + */ + protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException { + Configuration config = new ExtendedConfiguration(); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); + + getConfig(config, connectorConfig.getHdfsConfigResources()); + + // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout + checkHdfsUriForTimeout(config); + + /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure + * the processor without a complete restart + */ + String disableCacheName = String.format("fs.%s.impl.disable.cache", + FileSystem.getDefaultUri(config).getScheme()); + config.set(disableCacheName, "true"); + + // If kerberos is enabled, create the file system as the kerberos principal + // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time + FileSystem fs; + UserGroupInformation ugi; + synchronized (RESOURCES_LOCK) { + if (SecurityUtil.isSecurityEnabled(config)) { + ugi = SecurityUtil.loginKerberos(config, + connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab()); + fs = getFileSystemAsUser(config, ugi); + } else { + config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + config.set("hadoop.security.authentication", "simple"); + ugi = SecurityUtil.loginSimple(config); + fs = getFileSystemAsUser(config, ugi); + } + } + return new HdfsResources(config, fs, ugi); + } + + private static Configuration getConfig(final Configuration config, String res) throws IOException { + boolean foundResources = false; + if (null != res) { + String[] resources = res.split(","); + for (String resource : resources) { + config.addResource(new Path(resource.trim())); + foundResources = true; + } + } + + if (!foundResources) { + // check that at least 1 non-default resource is available on the classpath + String configStr = config.toString(); + for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { + if (!resource.contains("default") && config.getResource(resource.trim()) != null) { + foundResources = true; + break; + } + } + } + + if (!foundResources) { + throw new IOException("Could not find any of the " + res + " on the classpath"); + } + return config; + } + + /* + * Reduce the timeout of a socket connection from the default in FileSystem.get() + */ + protected void checkHdfsUriForTimeout(Configuration config) throws IOException { + URI hdfsUri = FileSystem.getDefaultUri(config); + String address = hdfsUri.getAuthority(); + int port = hdfsUri.getPort(); + if (address == null || address.isEmpty() || port < 0) { + return; + } + InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); + SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); + Socket socket = null; + try { + socket = socketFactory.createSocket(); + NetUtils.connect(socket, namenode, 1000); // 1 second timeout + } finally { + IOUtils.closeQuietly(socket); + } + } + + /** + * This exists in order to allow unit tests to override it so that they don't take several + * minutes waiting for UDP packets to be received. + * + * @param config + * the configuration to use + * @return the FileSystem that is created for the given Configuration + * @throws IOException + * if unable to create the FileSystem + */ + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return FileSystem.get(config); + } + + protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { + try { + return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(config); + } + }); + } catch (InterruptedException e) { + throw new IOException("Unable to create file system: " + e.getMessage()); + } + } + + protected Configuration getConfiguration() { + return hdfsResources.get().getConfiguration(); + } + + protected FileSystem getFileSystem() { + return hdfsResources.get().getFileSystem(); + } + + protected UserGroupInformation getUserGroupInformation() { + return hdfsResources.get().getUserGroupInformation(); + } + + protected String getEncoding() { + return StringUtils.isNotBlank(connectorConfig.getEncoding()) + ? connectorConfig.getEncoding() : Charset.defaultCharset().name(); + } + + protected CompressionCodec getCompressionCodec() { + if (connectorConfig.getCompression() == null) { + return null; + } + + CompressionCodec codec = getCompressionCodecFactory() + .getCodecByName(connectorConfig.getCompression().name()); + + return (codec != null) ? codec : new DefaultCodec(); + } + + protected CompressionCodecFactory getCompressionCodecFactory() { + if (compressionCodecFactory == null) { + compressionCodecFactory = new CompressionCodecFactory(getConfiguration()); + } + + return compressionCodecFactory; + } + + /** + * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be + * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load + * something that was previously not found, but might now be available. + * Reference the original getClassByNameOrNull from Configuration. + */ + static class ExtendedConfiguration extends Configuration { + + private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>(); + + @Override + public Class<?> getClassByNameOrNull(String name) { + final ClassLoader classLoader = getClassLoader(); + + Map<String, WeakReference<Class<?>>> map; + synchronized (cacheClasses) { + map = cacheClasses.get(classLoader); + if (map == null) { + map = Collections.synchronizedMap(new WeakHashMap<>()); + cacheClasses.put(classLoader, map); + } + } + + Class<?> clazz = null; + WeakReference<Class<?>> ref = map.get(name); + if (ref != null) { + clazz = ref.get(); + } + + if (clazz == null) { + try { + clazz = Class.forName(name, true, classLoader); + } catch (ClassNotFoundException e) { + return null; + } + // two putters can race here, but they'll put the same class + map.put(name, new WeakReference<>(clazz)); + return clazz; + } else { + // cache hit + return clazz; + } + } + + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java new file mode 100644 index 0000000..97dba53 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +/** + * An enumeration of compression codecs available for HDFS. + */ +public enum Compression { + BZIP2, DEFLATE, GZIP, LZ4, SNAPPY +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java new file mode 100644 index 0000000..1d04c6c --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * A wrapper class for HDFS resources. + */ +public class HdfsResources { + + private final Configuration configuration; + private final FileSystem fileSystem; + private final UserGroupInformation userGroupInformation; + + public HdfsResources(Configuration config, FileSystem fs, UserGroupInformation ugi) { + this.configuration = config; + this.fileSystem = fs; + this.userGroupInformation = ugi; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java new file mode 100644 index 0000000..c5462d3 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; + +import java.io.IOException; + +import org.apache.commons.lang3.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from + * interfering with each other. + */ +public class SecurityUtil { + public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; + public static final String KERBEROS = "kerberos"; + + /** + * Initializes UserGroupInformation with the given Configuration and performs the login for the + * given principal and keytab. All logins should happen through this class to ensure other threads + * are not concurrently modifying UserGroupInformation. + * <p/> + * @param config the configuration instance + * @param principal the principal to authenticate as + * @param keyTab the keytab to authenticate with + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginKerberos(final Configuration config, + final String principal, final String keyTab) throws IOException { + Validate.notNull(config); + Validate.notNull(principal); + Validate.notNull(keyTab); + + UserGroupInformation.setConfiguration(config); + UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim()); + return UserGroupInformation.getCurrentUser(); + } + + /** + * Initializes UserGroupInformation with the given Configuration and + * returns UserGroupInformation.getLoginUser(). All logins should happen + * through this class to ensure other threads are not concurrently + * modifying UserGroupInformation. + * + * @param config the configuration instance + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException { + Validate.notNull(config); + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.getLoginUser(); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns + * UserGroupInformation.isSecurityEnabled(). + * All checks for isSecurityEnabled() should happen through this method. + * + * @param config the given configuration + * + * @return true if kerberos is enabled on the given configuration, false otherwise + * + */ + public static boolean isSecurityEnabled(final Configuration config) { + Validate.notNull(config); + return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION)); + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java new file mode 100644 index 0000000..4294852 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs; \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java new file mode 100644 index 0000000..18184e2 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.hdfs.AbstractHdfsConnector; +import org.apache.pulsar.io.hdfs.HdfsResources; + +/** + * A Simple abstract class for HDFS sink. + * Users need to implement extractKeyValue function to use this sink. + */ +public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> { + + protected HdfsSinkConfig hdfsSinkConfig; + protected BlockingQueue<Record<V>> unackedRecords; + protected HdfsSyncThread<V> syncThread; + private Path path; + private FSDataOutputStream hdfsStream; + + public abstract KeyValue<K, V> extractKeyValue(Record<V> record); + protected abstract void createWriter() throws IOException; + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + hdfsSinkConfig = HdfsSinkConfig.load(config); + hdfsSinkConfig.validate(); + connectorConfig = hdfsSinkConfig; + unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords()); + connectToHdfs(); + createWriter(); + launchSyncThread(); + } + + @Override + public void close() throws Exception { + syncThread.halt(); + syncThread.join(0); + } + + protected final void connectToHdfs() throws IOException { + try { + HdfsResources resources = hdfsResources.get(); + + if (resources.getConfiguration() == null) { + resources = this.resetHDFSResources(hdfsSinkConfig); + hdfsResources.set(resources); + } + } catch (IOException ex) { + hdfsResources.set(new HdfsResources(null, null, null)); + throw ex; + } + } + + @SuppressWarnings("rawtypes") + protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws IOException { + Path path = getPath(); + FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation()); + FSDataOutputStreamBuilder builder = fs.exists(path) ? fs.appendFile(path) : fs.createFile(path); + return builder.recursive().permission(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + } + + protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException { + if (hdfsStream == null) { + hdfsStream = getOutputStreamBuilder().build(); + } + return hdfsStream; + } + + protected final Path getPath() { + if (path == null) { + String ext = ""; + if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) { + ext = hdfsSinkConfig.getFileExtension(); + } else if (getCompressionCodec() != null) { + ext = getCompressionCodec().getDefaultExtension(); + } + + path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(), + hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext)); + } + return path; + } + + protected final void launchSyncThread() throws IOException { + syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, hdfsSinkConfig.getSyncInterval()); + syncThread.start(); + } +} \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java new file mode 100644 index 0000000..e9f4cae --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +import org.apache.commons.lang.StringUtils; +import org.apache.pulsar.io.hdfs.AbstractHdfsConfig; + +/** + * Configuration object for all HDFS Sink components. + */ +@Data +@Setter +@Getter +@EqualsAndHashCode(callSuper = false) +@ToString +@Accessors(chain = true) +public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The prefix of the files to create inside the HDFS directory, i.e. a value of "topicA" + * will result in files named topicA-, topicA-, etc being produced + */ + private String filenamePrefix; + + /** + * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', etc. + */ + private String fileExtension; + + /** + * The character to use to separate records in a text file. If no value is provided + * then the content from all of the records will be concatenated together in one continuous + * byte array. + */ + private char separator; + + /** + * The interval (in milliseconds) between calls to flush data to HDFS disk. + */ + private long syncInterval; + + /** + * The maximum number of records that we hold in memory before acking. Default is Integer.MAX_VALUE. + * Setting this value to one, results in every record being sent to disk before the record is acked, + * while setting it to a higher values allows us to buffer records before flushing them all to disk. + */ + private int maxPendingRecords = Integer.MAX_VALUE; + + public static HdfsSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class); + } + + public static HdfsSinkConfig load(Map<String, Object> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), HdfsSinkConfig.class); + } + + @Override + public void validate() { + super.validate(); + if ((StringUtils.isEmpty(fileExtension) && getCompression() == null) + || StringUtils.isEmpty(filenamePrefix)) { + throw new IllegalArgumentException("Required property not set."); + } + + if (syncInterval < 0) { + throw new IllegalArgumentException("Sync Interval cannot be negative"); + } + + if (maxPendingRecords < 1) { + throw new IllegalArgumentException("Max Pending Records must be a positive integer"); + } + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java new file mode 100644 index 0000000..3c19ed8 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.fs.Syncable; +import org.apache.pulsar.functions.api.Record; + +/** + * A thread that runs in the background and acknowledges Records + * after they have been written to disk. + * + * @param <V> + */ +public class HdfsSyncThread<V> extends Thread { + + private final Syncable stream; + private final BlockingQueue<Record<V>> unackedRecords; + private final long syncInterval; + private boolean keepRunning = true; + + public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>> unackedRecords, long syncInterval) { + this.stream = stream; + this.unackedRecords = unackedRecords; + this.syncInterval = syncInterval; + } + + @Override + public void run() { + while (keepRunning) { + try { + Thread.sleep(syncInterval); + ackRecords(); + } catch (InterruptedException e) { + return; + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public final void halt() throws IOException, InterruptedException { + keepRunning = false; + ackRecords(); + } + + private void ackRecords() throws IOException, InterruptedException { + + if (CollectionUtils.isEmpty(unackedRecords)) { + return; + } + + synchronized (stream) { + stream.hsync(); + } + + while (!unackedRecords.isEmpty()) { + unackedRecords.take().ack(); + } + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java new file mode 100644 index 0000000..c6506d9 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java new file mode 100644 index 0000000..7c61c20 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.seq; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.SequenceFile.Writer.Option; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HDFS Sink that writes it contents to HDFS as Sequence Files. + * + * @param <K> - The incoming Key type + * @param <V> - The incoming Value type + * @param <HdfsK> - The HDFS Key type + * @param <HdfsV> - The HDFS Value type + */ +public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV> + extends HdfsAbstractSink<K, V> implements Sink<V> { + + private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class); + + protected AtomicLong counter; + protected FSDataOutputStream hdfsStream; + protected Writer writer = null; + + public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv); + + @Override + public void close() throws Exception { + writer.close(); + super.close(); + } + + @Override + protected void createWriter() throws IOException { + writer = getWriter(); + } + + @Override + public void write(Record<V> record) { + try { + KeyValue<K, V> kv = extractKeyValue(record); + KeyValue<HdfsK, HdfsV> keyValue = convert(kv); + writer.append(keyValue.getKey(), keyValue.getValue()); + unackedRecords.put(record); + } catch (IOException | InterruptedException e) { + LOG.error("Unable to write to file " + getPath(), e); + record.fail(); + } + } + + protected Writer getWriter() throws IOException { + counter = new AtomicLong(0); + List<Option> options = getOptions(); + return SequenceFile.createWriter(getConfiguration(), + options.toArray(new Option[options.size()])); + } + + protected List<Option> getOptions() throws IllegalArgumentException, IOException { + List<Option> list = new ArrayList<Option>(); + list.add(Writer.stream(getHdfsStream())); + + if (getCompressionCodec() != null) { + list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec())); + } + return list; + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java new file mode 100644 index 0000000..84ce09f --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.seq; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.SequenceFile.Writer.Option; +import org.apache.hadoop.io.Text; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; + +/** + * This Sink should be used when the records are originating from a sequential source, + * and we want to retain the record sequence.This class uses the record's sequence id as + * the sequence id in the HDFS Sequence File if it is available, if not a sequence id is + * auto-generated for each new record. + */ +public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, String, LongWritable, Text> { + + private AtomicLong counter; + + @Override + public Writer getWriter() throws IOException { + counter = new AtomicLong(0); + + return SequenceFile + .createWriter( + getConfiguration(), + getOptions().toArray(new Option[getOptions().size()])); + } + + @Override + protected List<Option> getOptions() throws IllegalArgumentException, IOException { + List<Option> opts = super.getOptions(); + opts.add(Writer.keyClass(LongWritable.class)); + opts.add(Writer.valueClass(Text.class)); + return opts; + } + + @Override + public KeyValue<Long, String> extractKeyValue(Record<String> record) { + Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet())); + return new KeyValue<>(sequence, new String(record.getValue())); + } + + @Override + public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) { + return new KeyValue<>(new LongWritable(kv.getKey()), new Text(kv.getValue())); + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java new file mode 100644 index 0000000..84ebc07 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.seq; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.SequenceFile.Writer.Option; +import org.apache.hadoop.io.Text; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; + +/** + * A Simple Sink class for Hdfs Sequence File. + */ +public class HdfsTextSink extends + HdfsAbstractSequenceFileSink<String, String, Text, Text> { + + @Override + protected List<Option> getOptions() throws IllegalArgumentException, IOException { + List<Option> opts = super.getOptions(); + opts.add(Writer.keyClass(Text.class)); + opts.add(Writer.valueClass(Text.class)); + return opts; + } + + @Override + public KeyValue<String, String> extractKeyValue(Record<String> record) { + String key = record.getKey().orElseGet(() -> new String(record.getValue())); + return new KeyValue<>(key, new String(record.getValue())); + } + + @Override + public KeyValue<Text, Text> convert(KeyValue<String, String> kv) { + return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue())); + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java new file mode 100644 index 0000000..025311a --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.seq; \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java new file mode 100644 index 0000000..6fb50a5 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.text; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for HDFS Sinks that writes there contents to HDFS as Text Files. + * + * @param <K> + * @param <V> + */ +public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> { + + private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractTextFileSink.class); + + protected OutputStreamWriter writer; + + @Override + protected void createWriter() throws IOException { + writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding()); + } + + @Override + public void close() throws Exception { + writer.close(); + super.close(); + } + + @Override + public void write(Record<V> record) { + try { + KeyValue<K, V> kv = extractKeyValue(record); + writer.write(kv.getValue().toString()); + + if (hdfsSinkConfig.getSeparator() != '\u0000') { + writer.write(hdfsSinkConfig.getSeparator()); + } + unackedRecords.put(record); + } catch (IOException | InterruptedException e) { + LOG.error("Unable to write to file " + getPath(), e); + record.fail(); + } + } + + private OutputStream openHdfsStream() throws IOException { + if (hdfsSinkConfig.getCompression() != null) { + return getCompressionCodec().createOutputStream(getHdfsStream()); + } else { + return getHdfsStream(); + } + } +} \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java new file mode 100644 index 0000000..355c6df --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.text; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.KeyValue; +import org.apache.pulsar.io.core.Sink; + +/** + * A Simple Sink class for Hdfs Text File. + */ +public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String> implements Sink<String> { + @Override + public KeyValue<String, String> extractKeyValue(Record<String> record) { + String key = record.getKey().orElseGet(() -> new String(record.getValue())); + return new KeyValue<>(key, new String(record.getValue())); + } +} diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java new file mode 100644 index 0000000..9ade570 --- /dev/null +++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.text; \ No newline at end of file diff --git a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000..f2a2b55 --- /dev/null +++ b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: hdfs +description: Writes data into HDFS +sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java new file mode 100644 index 0000000..32085dd --- /dev/null +++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.annotations.BeforeMethod; + +/** + * Simple base class for all the HDFS sink test cases. + * Provides utility methods for sending records to the sink. + * + */ +public abstract class AbstractHdfsSinkTest<K, V> { + + @Mock + protected SinkContext mockSinkContext; + + @Mock + protected Record<V> mockRecord; + + protected Map<String, Object> map; + protected HdfsAbstractSink<K, V> sink; + + @SuppressWarnings("unchecked") + @BeforeMethod + public final void setUp() throws Exception { + map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml," + + "../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml"); + map.put("directory", "/tmp/testing"); + map.put("filenamePrefix", "prefix"); + + mockSinkContext = mock(SinkContext.class); + + mockRecord = mock(Record.class); + when(mockRecord.getRecordSequence()).thenAnswer(new Answer<Optional<Long>>() { + long sequenceCounter = 0; + public Optional<Long> answer(InvocationOnMock invocation) throws Throwable { + return Optional.of(sequenceCounter++); + }}); + + when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() { + long sequenceCounter = 0; + public Optional<String> answer(InvocationOnMock invocation) throws Throwable { + return Optional.of( "key-" + sequenceCounter++); + }}); + + when(mockRecord.getValue()).thenAnswer(new Answer<String>() { + long sequenceCounter = 0; + public String answer(InvocationOnMock invocation) throws Throwable { + return new String( "value-" + sequenceCounter++ + "-" + UUID.randomUUID().toString()); + }}); + + createSink(); + } + + protected abstract void createSink(); + + protected final void send(int numRecords) throws Exception { + for (int idx = 0; idx < numRecords; idx++) { + sink.write(mockRecord); + } + } + + protected final void runFor(int numSeconds) throws InterruptedException { + Producer producer = new Producer(); + producer.start(); + Thread.sleep(numSeconds * 1000); // Run for N seconds + producer.halt(); + producer.join(2000); + } + + protected final class Producer extends Thread { + public boolean keepRunning = true; + @Override + public void run() { + while (keepRunning) + try { + sink.write(mockRecord); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void halt() { + keepRunning = false; + } + + } +} diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java new file mode 100644 index 0000000..2f0b3f3 --- /dev/null +++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.pulsar.io.hdfs.Compression; +import org.testng.annotations.Test; + +import com.fasterxml.jackson.databind.exc.InvalidFormatException; + + +public class HdfsSinkConfigTests { + + @Test + public final void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + HdfsSinkConfig config = HdfsSinkConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + assertEquals("core-site.xml", config.getHdfsConfigResources()); + assertEquals("/foo/bar", config.getDirectory()); + assertEquals("prefix", config.getFilenamePrefix()); + assertEquals(Compression.SNAPPY, config.getCompression()); + } + + @Test + public final void loadFromMapTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + map.put("directory", "/foo/bar"); + map.put("filenamePrefix", "prefix"); + map.put("compression", "SNAPPY"); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + assertNotNull(config); + assertEquals("core-site.xml", config.getHdfsConfigResources()); + assertEquals("/foo/bar", config.getDirectory()); + assertEquals("prefix", config.getFilenamePrefix()); + assertEquals(Compression.SNAPPY, config.getCompression()); + } + + @Test + public final void validValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + map.put("directory", "/foo/bar"); + map.put("filenamePrefix", "prefix"); + map.put("fileExtension", ".txt"); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required property not set.") + public final void missingDirectoryValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required property not set.") + public final void missingHdfsConfigsValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("directory", "/foo/bar"); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = InvalidFormatException.class) + public final void invalidCodecValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + map.put("directory", "/foo/bar"); + map.put("filenamePrefix", "prefix"); + map.put("fileExtension", ".txt"); + map.put("compression", "bad value"); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Sync Interval cannot be negative") + public final void invalidSyncIntervalTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + map.put("directory", "/foo/bar"); + map.put("filenamePrefix", "prefix"); + map.put("fileExtension", ".txt"); + map.put("syncInterval", -1); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Max Pending Records must be a positive integer") + public final void invalidMaxPendingRecordsTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + map.put("directory", "/foo/bar"); + map.put("filenamePrefix", "prefix"); + map.put("fileExtension", ".txt"); + map.put("maxPendingRecords", 0); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Values for both kerberosUserPrincipal & keytab are required.") + public final void kerberosValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("hdfsConfigResources", "core-site.xml"); + map.put("directory", "/foo/bar"); + map.put("filenamePrefix", "prefix"); + map.put("keytab", "/etc/keytab/hdfs.client.ktab"); + + HdfsSinkConfig config = HdfsSinkConfig.load(map); + config.validate(); + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } +} diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java new file mode 100644 index 0000000..d54e5ec --- /dev/null +++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.seq; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertNotNull; + +import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest; +import org.testng.annotations.Test; + +public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, String> { + + @Override + protected void createSink() { + sink = new HdfsSequentialTextSink(); + } + + @Test(enabled = false) + public final void write100Test() throws Exception { + map.put("filenamePrefix", "write100Test-seq"); + map.put("fileExtension", ".seq"); + map.put("syncInterval", 1000); + sink.open(map, mockSinkContext); + + assertNotNull(sink); + send(100); + + Thread.sleep(2000); + verify(mockRecord, times(100)).ack(); + sink.close(); + } + + @Test(enabled = false) + public final void write5000Test() throws Exception { + map.put("filenamePrefix", "write5000Test-seq"); + map.put("fileExtension", ".seq"); + map.put("syncInterval", 1000); + sink.open(map, mockSinkContext); + + assertNotNull(sink); + send(5000); + + Thread.sleep(2000); + verify(mockRecord, times(5000)).ack(); + sink.close(); + } + + @Test(enabled = false) + public final void tenSecondTest() throws Exception { + map.put("filenamePrefix", "tenSecondTest-seq"); + map.put("fileExtension", ".seq"); + map.put("syncInterval", 1000); + sink.open(map, mockSinkContext); + runFor(10); + sink.close(); + } + + @Test(enabled = false) + public final void bzip2CompressionTest() throws Exception { + map.put("filenamePrefix", "bzip2CompressionTest-seq"); + map.put("compression", "BZIP2"); + map.remove("fileExtension"); + sink.open(map, mockSinkContext); + send(5000); + verify(mockRecord, times(5000)).ack(); + } + + @Test(enabled = false) + public final void deflateCompressionTest() throws Exception { + map.put("filenamePrefix", "deflateCompressionTest-seq"); + map.put("compression", "DEFLATE"); + map.remove("fileExtension"); + sink.open(map, mockSinkContext); + send(5000); + verify(mockRecord, times(5000)).ack(); + } +} diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java new file mode 100644 index 0000000..bb720fa --- /dev/null +++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.seq; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertNotNull; + +import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest; +import org.testng.annotations.Test; + +public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> { + + @Override + protected void createSink() { + sink = new HdfsTextSink(); + } + + @Test(enabled = false) + public final void write100Test() throws Exception { + map.put("filenamePrefix", "write100TestText-seq"); + map.put("fileExtension", ".seq"); + map.put("syncInterval", 1000); + sink.open(map, mockSinkContext); + + assertNotNull(sink); + assertNotNull(mockRecord); + send(100); + + Thread.sleep(2000); + verify(mockRecord, times(100)).ack(); + sink.close(); + } + + @Test(enabled = false) + public final void write5000Test() throws Exception { + map.put("filenamePrefix", "write5000TestText-seq"); + map.put("fileExtension", ".seq"); + map.put("syncInterval", 1000); + sink.open(map, mockSinkContext); + + assertNotNull(sink); + assertNotNull(mockRecord); + send(5000); + + Thread.sleep(2000); + verify(mockRecord, times(5000)).ack(); + sink.close(); + } + + @Test(enabled = false) + public final void tenSecondTest() throws Exception { + map.put("filenamePrefix", "tenSecondTestText-seq"); + map.put("fileExtension", ".seq"); + map.put("syncInterval", 1000); + sink.open(map, mockSinkContext); + + assertNotNull(mockRecord); + + runFor(10); + sink.close(); + } + + @Test(enabled = false) + public final void bzip2CompressionTest() throws Exception { + map.put("filenamePrefix", "bzip2CompressionTestText-seq"); + map.put("compression", "BZIP2"); + map.remove("fileExtension"); + sink.open(map, mockSinkContext); + + assertNotNull(mockRecord); + + send(5000); + verify(mockRecord, times(5000)).ack(); + } + + @Test(enabled = false) + public final void deflateCompressionTest() throws Exception { + map.put("filenamePrefix", "deflateCompressionTestText-seq"); + map.put("compression", "DEFLATE"); + map.remove("fileExtension"); + sink.open(map, mockSinkContext); + + assertNotNull(mockRecord); + send(5000); + verify(mockRecord, times(5000)).ack(); + } +} diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java new file mode 100644 index 0000000..98b8a61 --- /dev/null +++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.hdfs.sink.text; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest; +import org.testng.annotations.Test; + +public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> { + + @Override + protected void createSink() { + sink = new HdfsStringSink(); + } + + @Test(enabled = false) + public final void write5000Test() throws Exception { + map.put("filenamePrefix", "write5000Test"); + map.put("fileExtension", ".txt"); + map.put("separator", '\n'); + sink.open(map, mockSinkContext); + send(5000); + sink.close(); + verify(mockRecord, times(5000)).ack(); + } + + @Test(enabled = false) + public final void fiveByTwoThousandTest() throws Exception { + map.put("filenamePrefix", "fiveByTwoThousandTest"); + map.put("fileExtension", ".txt"); + map.put("separator", '\n'); + sink.open(map, mockSinkContext); + + for (int idx = 1; idx < 6; idx++) { + send(2000); + } + sink.close(); + verify(mockRecord, times(2000 * 5)).ack(); + } + + @Test(enabled = false) + public final void tenSecondTest() throws Exception { + map.put("filenamePrefix", "tenSecondTest"); + map.put("fileExtension", ".txt"); + map.put("separator", '\n'); + sink.open(map, mockSinkContext); + runFor(10); + sink.close(); + } + + @Test(enabled = false) + public final void maxPendingRecordsTest() throws Exception { + map.put("filenamePrefix", "maxPendingRecordsTest"); + map.put("fileExtension", ".txt"); + map.put("separator", '\n'); + map.put("maxPendingRecords", 500); + sink.open(map, mockSinkContext); + runFor(10); + sink.close(); + } + + @Test(enabled = false) + public final void bzip2CompressionTest() throws Exception { + map.put("filenamePrefix", "bzip2CompressionTest"); + map.put("compression", "BZIP2"); + map.remove("fileExtension"); + map.put("separator", '\n'); + sink.open(map, mockSinkContext); + send(5000); + sink.close(); + verify(mockRecord, times(5000)).ack(); + } + + @Test(enabled = false) + public final void deflateCompressionTest() throws Exception { + map.put("filenamePrefix", "deflateCompressionTest"); + map.put("compression", "DEFLATE"); + map.put("fileExtension", ".deflate"); + map.put("separator", '\n'); + sink.open(map, mockSinkContext); + send(50000); + sink.close(); + verify(mockRecord, times(50000)).ack(); + } +} diff --git a/pulsar-io/pom.xml b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml similarity index 50% copy from pulsar-io/pom.xml copy to pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml index 92f2186..31d1e98 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml @@ -18,29 +18,15 @@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <packaging>pom</packaging> - <parent> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar</artifactId> - <version>2.2.0-incubating-SNAPSHOT</version> - </parent> - - <artifactId>pulsar-io</artifactId> - <name>Pulsar IO :: Parent</name> - - <modules> - <module>core</module> - <module>twitter</module> - <module>cassandra</module> - <module>aerospike</module> - <module>kafka</module> - <module>rabbitmq</module> - <module>kinesis</module> - <module>jdbc</module> - <module>data-genenator</module> - </modules> - -</project> +<configuration> + <property> + <name>fs.defaultFS</name> + <value>hdfs://0.0.0.0:8020</value> + </property> + <property> + <name>io.compression.codecs</name> + <value>org.apache.hadoop.io.compress.GzipCodec, + org.apache.hadoop.io.compress.DefaultCodec, + org.apache.hadoop.io.compress.SnappyCodec</value> + </property> +</configuration> diff --git a/pulsar-io/pom.xml b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml similarity index 50% copy from pulsar-io/pom.xml copy to pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml index 92f2186..bb722f1 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml @@ -18,29 +18,17 @@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <packaging>pom</packaging> - <parent> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar</artifactId> - <version>2.2.0-incubating-SNAPSHOT</version> - </parent> - - <artifactId>pulsar-io</artifactId> - <name>Pulsar IO :: Parent</name> - - <modules> - <module>core</module> - <module>twitter</module> - <module>cassandra</module> - <module>aerospike</module> - <module>kafka</module> - <module>rabbitmq</module> - <module>kinesis</module> - <module>jdbc</module> - <module>data-genenator</module> - </modules> - -</project> +<configuration> + <property> + <name>dfs.replication</name> + <value>1</value> + </property> + <property> + <name>dfs.client.use.datanode.hostname</name> + <value>true</value> + </property> + <property> + <name>dfs.support.append</name> + <value>true</value> + </property> +</configuration> diff --git a/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml new file mode 100644 index 0000000..5a19ee0 --- /dev/null +++ b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +{ +"hdfsConfigResources": "core-site.xml", +"directory": "/foo/bar", +"filenamePrefix": "prefix", +"compression": "SNAPPY" +} \ No newline at end of file diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 92f2186..5c03370 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -39,6 +39,7 @@ <module>kafka</module> <module>rabbitmq</module> <module>kinesis</module> + <module>hdfs</module> <module>jdbc</module> <module>data-genenator</module> </modules> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java new file mode 100644 index 0000000..58f781f --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.containers; + +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; + +public class HdfsContainer extends ChaosContainer<HdfsContainer> { + + public static final String NAME = "HDFS"; + static final Integer[] PORTS = { 8020, 8032, 8088, 9000, 10020, 19888, 50010, 50020, 50070, 50070, 50090 }; + + private static final String IMAGE_NAME = "harisekhon/hadoop:latest"; + + public HdfsContainer(String clusterName) { + super(clusterName, IMAGE_NAME); + } + + @Override + public String getContainerName() { + return clusterName; + } + + @Override + protected void configure() { + super.configure(); + this.withNetworkAliases(NAME) + .withExposedPorts(PORTS) + .withCreateContainerCmdModifier(createContainerCmd -> { + createContainerCmd.withHostName(NAME); + createContainerCmd.withName(clusterName + "-" + NAME); + }) + .waitingFor(new HostPortWaitStrategy()); + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 7b3cd4f..17634a1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -72,7 +72,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { public void testCassandraArchiveSink() throws Exception { testSink(new CassandraSinkArchiveTester(), false); } - + + @Test(enabled = false) + public void testHdfsSink() throws Exception { + testSink(new HdfsSinkTester(), false); + } + @Test public void testJdbcSink() throws Exception { testSink(new JdbcSinkTester(), true); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java new file mode 100644 index 0000000..46c5f24 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import java.util.Map; + +import org.apache.pulsar.tests.integration.containers.HdfsContainer; +import org.testcontainers.containers.GenericContainer; + +import static com.google.common.base.Preconditions.checkState; + +public class HdfsSinkTester extends SinkTester { + + private static final String NAME = "HDFS"; + + private HdfsContainer hdfsCluster; + + public HdfsSinkTester() { + super(SinkType.HDFS); + + // TODO How do I get the core-site.xml, and hdfs-site.xml files from the container? + sinkConfig.put("hdfsConfigResources", ""); + sinkConfig.put("directory", "/testing/test"); + } + + @Override + public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { + GenericContainer<?> container = containers.get(NAME); + checkState(container instanceof HdfsContainer, "No HDFS service found in the cluster"); + this.hdfsCluster = (HdfsContainer) container; + } + + @Override + public void prepareSink() throws Exception { + // Create the test directory + hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing"); + hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing"); + + // Execute all future commands as the "tester" user + hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester"); + } + + @Override + public void validateSinkResult(Map<String, String> kvs) { + // TODO Auto-generated method stub + + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java index 098b8bf..7f4b2d9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java @@ -33,7 +33,8 @@ public abstract class SinkTester { UNDEFINED, CASSANDRA, KAFKA, - JDBC + JDBC, + HDFS } protected final SinkType sinkType; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java index 147f273..438c96e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java @@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.suites; import java.util.Map; import org.apache.pulsar.tests.integration.containers.CassandraContainer; +import org.apache.pulsar.tests.integration.containers.HdfsContainer; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder; import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; import org.testcontainers.containers.GenericContainer;