This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 209c448 Added Pulsar IO connector for local files (#2869)
209c448 is described below
commit 209c44883ad30b13b1a693a02aff8858c287401e
Author: David Kjerrumgaard <[email protected]>
AuthorDate: Thu Dec 27 09:49:04 2018 -0800
Added Pulsar IO connector for local files (#2869)
### Motivation
Added a Pulsar IO connector for consuming files from the local filesystem
### Modifications
Added a new module to the pulsar-io module that includes the Pulsar file
connector and its associated classes & tests
### Result
After your change, users will be able to consume files from the local
filesystem, and have the contents directly published to a Pulsar topic.
---
pulsar-io/file/pom.xml | 70 ++++++
.../apache/pulsar/io/file/FileConsumerThread.java | 108 +++++++++
.../apache/pulsar/io/file/FileListingThread.java | 189 +++++++++++++++
.../java/org/apache/pulsar/io/file/FileRecord.java | 75 ++++++
.../java/org/apache/pulsar/io/file/FileSource.java | 70 ++++++
.../apache/pulsar/io/file/FileSourceConfig.java | 177 ++++++++++++++
.../apache/pulsar/io/file/ProcessedFileThread.java | 60 +++++
.../org/apache/pulsar/io/file/package-info.java | 19 ++
.../org/apache/pulsar/io/file/utils/GZipFiles.java | 96 ++++++++
.../org/apache/pulsar/io/file/utils/ZipFiles.java | 94 ++++++++
.../apache/pulsar/io/file/utils/package-info.java | 19 ++
.../resources/META-INF/services/pulsar-io.yaml | 22 ++
.../apache/pulsar/io/file/AbstractFileTests.java | 138 +++++++++++
.../pulsar/io/file/FileConsumerThreadTests.java | 148 ++++++++++++
.../pulsar/io/file/FileListingThreadTests.java | 238 +++++++++++++++++++
.../pulsar/io/file/FileSourceConfigTests.java | 121 ++++++++++
.../pulsar/io/file/ProcessedFileThreadTests.java | 259 +++++++++++++++++++++
.../apache/pulsar/io/file/TestFileGenerator.java | 91 ++++++++
.../pulsar/io/file/utils/GZipFilesTests.java | 68 ++++++
.../apache/pulsar/io/file/utils/ZipFilesTests.java | 68 ++++++
.../org/apache/pulsar/io/file/mislabelled.gz | 1 +
.../org/apache/pulsar/io/file/nonGzipFile.txt | 20 ++
.../org/apache/pulsar/io/file/validGzip.gz | Bin 0 -> 60 bytes
.../org/apache/pulsar/io/file/validZip.zip | Bin 0 -> 200 bytes
pulsar-io/file/src/test/resources/sinkConfig.yaml | 33 +++
pulsar-io/pom.xml | 1 +
26 files changed, 2185 insertions(+)
diff --git a/pulsar-io/file/pom.xml b/pulsar-io/file/pom.xml
new file mode 100644
index 0000000..c2d892f
--- /dev/null
+++ b/pulsar-io/file/pom.xml
@@ -0,0 +1,70 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-file</artifactId>
+ <name>Pulsar IO :: File</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
new file mode 100644
index 0000000..8c9803a
--- /dev/null
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.file.utils.GZipFiles;
+import org.apache.pulsar.io.file.utils.ZipFiles;
+
+/**
+ * Worker thread that consumes the contents of the files
+ * and publishes them to a Pulsar topic.
+ */
+public class FileConsumerThread extends Thread {
+
+ private final PushSource<byte[]> source;
+ private final BlockingQueue<File> workQueue;
+ private final BlockingQueue<File> inProcess;
+ private final BlockingQueue<File> recentlyProcessed;
+
+ public FileConsumerThread(PushSource<byte[]> source,
+ BlockingQueue<File> workQueue,
+ BlockingQueue<File> inProcess,
+ BlockingQueue<File> recentlyProcessed) {
+ this.source = source;
+ this.workQueue = workQueue;
+ this.inProcess = inProcess;
+ this.recentlyProcessed = recentlyProcessed;
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ File file = workQueue.take();
+
+ boolean added = false;
+ do {
+ added = inProcess.add(file);
+ } while (!added);
+
+ consumeFile(file);
+ }
+ } catch (InterruptedException ie) {
+ // just terminate
+ }
+ }
+
+ private void consumeFile(File file) {
+ final AtomicInteger idx = new AtomicInteger(1);
+ try (Stream<String> lines = getLines(file)) {
+ lines.forEachOrdered(line -> process(file, idx.getAndIncrement(),
line));
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+
+ boolean removed = false;
+ do {
+ removed = inProcess.remove(file);
+ } while (!removed);
+
+ boolean added = false;
+ do {
+ added = recentlyProcessed.add(file);
+ } while (!added);
+ }
+ }
+
+ private Stream<String> getLines(File file) throws IOException {
+ if (file == null) {
+ return null;
+ } else if (GZipFiles.isGzip(file)) {
+ return GZipFiles.lines(Paths.get(file.getAbsolutePath()));
+ } else if (ZipFiles.isZip(file)) {
+ return ZipFiles.lines(Paths.get(file.getAbsolutePath()));
+ } else {
+ return Files.lines(Paths.get(file.getAbsolutePath()),
Charset.defaultCharset());
+ }
+ }
+
+ private void process(File srcFile, int lineNumber, String line) {
+ source.consume(new FileRecord(srcFile, lineNumber, line.getBytes()));
+ }
+
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
new file mode 100644
index 0000000..a13b923
--- /dev/null
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+/**
+ * Worker thread that checks the configured input directory for
+ * files that meet the provided filtering criteria, and publishes
+ * them to a work queue for processing by the FileConsumerThreads.
+ */
+public class FileListingThread extends Thread {
+
+ private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+ private final Lock listingLock = new ReentrantLock();
+ private final AtomicReference<FileFilter> fileFilterRef = new
AtomicReference<>();
+ private final BlockingQueue<File> workQueue;
+ private final BlockingQueue<File> inProcess;
+ private final BlockingQueue<File> recentlyProcessed;
+
+ private final String inputDir;
+ private final boolean recurseDirs;
+ private final boolean keepOriginal;
+ private final long pollingInterval;
+
+ public FileListingThread(FileSourceConfig fileConfig,
+ BlockingQueue<File> workQueue,
+ BlockingQueue<File> inProcess,
+ BlockingQueue<File> recentlyProcessed) {
+ this.workQueue = workQueue;
+ this.inProcess = inProcess;
+ this.recentlyProcessed = recentlyProcessed;
+
+ inputDir = fileConfig.getInputDirectory();
+ recurseDirs =
Optional.ofNullable(fileConfig.getRecurse()).orElse(true);
+ keepOriginal =
Optional.ofNullable(fileConfig.getKeepFile()).orElse(false);
+ pollingInterval =
Optional.ofNullable(fileConfig.getPollingInterval()).orElse(10000L);
+ fileFilterRef.set(createFileFilter(fileConfig));
+ }
+
+ public void run() {
+ while (true) {
+ if ((queueLastUpdated.get() < System.currentTimeMillis() -
pollingInterval) && listingLock.tryLock()) {
+ try {
+ final File directory = new File(inputDir);
+ final Set<File> listing = performListing(directory,
fileFilterRef.get(), recurseDirs);
+
+ if (listing != null && !listing.isEmpty()) {
+
+ // Remove any files that have been or are currently
being processed.
+ listing.removeAll(inProcess);
+ if (!keepOriginal) {
+ listing.removeAll(recentlyProcessed);
+ }
+
+ for (File f: listing) {
+ if (!workQueue.contains(f)) {
+ workQueue.offer(f);
+ }
+ }
+ queueLastUpdated.set(System.currentTimeMillis());
+ }
+
+ } finally {
+ listingLock.unlock();
+ }
+ }
+
+ try {
+ sleep(pollingInterval - 1);
+ } catch (InterruptedException e) {
+ // Just ignore
+ }
+ }
+ }
+
+ private Set<File> performListing(final File directory, final FileFilter
filter,
+ final boolean recurseSubdirectories) {
+ Path p = directory.toPath();
+ if (!Files.isWritable(p) || !Files.isReadable(p)) {
+ throw new IllegalStateException("Directory '" + directory
+ + "' does not have sufficient permissions (i.e., not
writable and readable)");
+ }
+ final Set<File> queue = new HashSet<>();
+ if (!directory.exists()) {
+ return queue;
+ }
+
+ final File[] children = directory.listFiles();
+ if (children == null) {
+ return queue;
+ }
+
+ for (final File child : children) {
+ if (child.isDirectory()) {
+ if (recurseSubdirectories) {
+ queue.addAll(performListing(child, filter,
recurseSubdirectories));
+ }
+ } else if (filter.accept(child)) {
+ queue.add(child);
+ }
+ }
+
+ return queue;
+ }
+
+ private FileFilter createFileFilter(FileSourceConfig fileConfig) {
+ final long minSize =
Optional.ofNullable(fileConfig.getMinimumSize()).orElse(1);
+ final Double maxSize =
Optional.ofNullable(fileConfig.getMaximumSize()).orElse(Double.MAX_VALUE);
+ final long minAge =
Optional.ofNullable(fileConfig.getMinimumFileAge()).orElse(0);
+ final Long maxAge =
Optional.ofNullable(fileConfig.getMaximumFileAge()).orElse(Long.MAX_VALUE);
+ final boolean ignoreHidden =
Optional.ofNullable(fileConfig.getIgnoreHiddenFiles()).orElse(true);
+ final Pattern filePattern =
Pattern.compile(Optional.ofNullable(fileConfig.getFileFilter()).orElse("[^\\.].*"));
+ final String indir = fileConfig.getInputDirectory();
+ final String pathPatternStr = fileConfig.getPathFilter();
+ final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ?
null : Pattern.compile(pathPatternStr);
+
+ return new FileFilter() {
+ @Override
+ public boolean accept(final File file) {
+ if (minSize > file.length()) {
+ return false;
+ }
+ if (maxSize != null && maxSize < file.length()) {
+ return false;
+ }
+ final long fileAge = System.currentTimeMillis() -
file.lastModified();
+ if (minAge > fileAge) {
+ return false;
+ }
+ if (maxAge != null && maxAge < fileAge) {
+ return false;
+ }
+ if (ignoreHidden && file.isHidden()) {
+ return false;
+ }
+ if (pathPattern != null) {
+ Path reldir =
Paths.get(indir).relativize(file.toPath()).getParent();
+ if (reldir != null && !reldir.toString().isEmpty()) {
+ if (!pathPattern.matcher(reldir.toString()).matches())
{
+ return false;
+ }
+ }
+ }
+ //Verify that we have at least read permissions on the file
we're considering grabbing
+ if (!Files.isReadable(file.toPath())) {
+ return false;
+ }
+
+ /* Verify that if we're not keeping original that we have write
+ * permissions on the directory the file is in
+ */
+ if (!keepOriginal &&
!Files.isWritable(file.toPath().getParent())) {
+ return false;
+ }
+ return filePattern.matcher(file.getName()).matches();
+ }
+ };
+ }
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
new file mode 100644
index 0000000..0fb9448
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * Implementation of the Record interface for File Source data.
+ * - The key is set to the source file name + the line number of the record.
+ * - The value is set to the file contents for the given line number (in
bytes)
+ * - The following user properties are also set:
+ * - The source file name
+ * - The absolute path of the source file
+ * - The last modified time of the source file.
+ */
+@Data
+public class FileRecord implements Record<byte[]> {
+
+ public static final String FILE_NAME = "file.name";
+ public static final String FILE_ABSOLUTE_PATH = "file.path";
+ public static final String FILE_MODIFIED_TIME = "file.modified.time";
+
+ private final Optional<String> key;
+ private final byte[] value;
+ private final HashMap<String, String> userProperties = new HashMap<String,
String> ();
+
+ public FileRecord(File srcFile, int lineNumber, byte[] value) {
+ this.key = Optional.of(srcFile.getName() + "_" + lineNumber);
+ this.value = value;
+ this.setProperty(FILE_NAME, srcFile.getName());
+ this.setProperty(FILE_ABSOLUTE_PATH, srcFile.getAbsolutePath());
+ this.setProperty(FILE_MODIFIED_TIME, srcFile.lastModified() + "");
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ public Map<String, String> getProperties() {
+ return userProperties;
+ }
+
+ public void setProperty(String key, String value) {
+ userProperties.put(key, value);
+ }
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
new file mode 100644
index 0000000..bc09c97
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A simple connector to consume messages from the local file system.
+ * It can be configured to consume files recursively from a given
+ * directory, and can handle plain text, gzip, and zip formatted files.
+ */
+public class FileSource extends PushSource<byte[]> {
+
+ private ExecutorService executor;
+ private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>();
+ private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>();
+ private final BlockingQueue<File> recentlyProcessed = new
LinkedBlockingQueue<>();
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
+ FileSourceConfig fileConfig = FileSourceConfig.load(config);
+ fileConfig.validate();
+
+ // One extra for the File listing thread, and another for the cleanup
thread
+ executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() +
2);
+ executor.execute(new FileListingThread(fileConfig, workQueue,
inProcess, recentlyProcessed));
+ executor.execute(new ProcessedFileThread(fileConfig,
recentlyProcessed));
+
+ for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) {
+ executor.execute(new FileConsumerThread(this, workQueue,
inProcess, recentlyProcessed));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
new file mode 100644
index 0000000..efc7f63
--- /dev/null
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Configuration class for the File Source Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class FileSourceConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The input directory from which to pull files.
+ */
+ private String inputDirectory;
+
+ /**
+ * Indicates whether or not to pull files from sub-directories.
+ */
+ private Boolean recurse;
+
+ /**
+ * If true, the file is not deleted after it has been processed and
+ * causes the file to be picked up continually.
+ */
+ private Boolean keepFile = Boolean.FALSE;
+
+ /**
+ * Only files whose names match the given regular expression will be
picked up.
+ */
+ private String fileFilter = "[^\\.].*";
+
+ /**
+ * When 'recurse' property is true, then only sub-directories whose
+ * path matches the given regular expression will be scanned.
+ */
+ private String pathFilter;
+
+ /**
+ * The minimum age that a file must be in order to be processed; any file
younger
+ * than this amount of time (according to last modification date) will be
ignored.
+ */
+ private Integer minimumFileAge;
+
+ /**
+ * The maximum age that a file must be in order to be processed; any file
older
+ * than this amount of time (according to last modification date) will be
ignored.
+ */
+ private Long maximumFileAge;
+
+ /**
+ * The minimum size (in bytes) that a file must be in order to be
processed.
+ */
+ private Integer minimumSize;
+
+ /**
+ * The maximum size (in bytes) that a file can be in order to be processed.
+ */
+ private Double maximumSize;
+
+ /**
+ * Indicates whether or not hidden files should be ignored or not.
+ */
+ private Boolean ignoreHiddenFiles;
+
+ /**
+ * Indicates how long to wait before performing a directory listing.
+ */
+ private Long pollingInterval;
+
+ /**
+ * The number of worker threads that will be processing the files.
+ * This allows you to process a larger number of files concurrently.
+ * However, setting this to a value greater than 1 will result in the data
+ * from multiple files being "intermingled" in the target topic.
+ */
+ private Integer numWorkers = 1;
+
+ public static FileSourceConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), FileSourceConfig.class);
+ }
+
+ public static FileSourceConfig load(Map<String, Object> map) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map),
FileSourceConfig.class);
+ }
+
+ public void validate() {
+ if (StringUtils.isBlank(inputDirectory)) {
+ throw new IllegalArgumentException("Required property not set.");
+ } else if (Files.notExists(Paths.get(inputDirectory),
LinkOption.NOFOLLOW_LINKS)) {
+ throw new IllegalArgumentException("Specified input directory does
not exist");
+ } else if (!Files.isReadable(Paths.get(inputDirectory))) {
+ throw new IllegalArgumentException("Specified input directory is
not readable");
+ } else if (Optional.ofNullable(keepFile).orElse(false) &&
!Files.isWritable(Paths.get(inputDirectory))) {
+ throw new IllegalArgumentException("You have requested the
consumed files to be deleted, but the "
+ + "source directory is not writeable.");
+ }
+
+ if (StringUtils.isNotBlank(fileFilter)) {
+ try {
+ Pattern.compile(fileFilter);
+ } catch (final PatternSyntaxException psEx) {
+ throw new IllegalArgumentException("Invalid Regex pattern
provided for fileFilter");
+ }
+ }
+
+ if (minimumFileAge != null && Math.signum(minimumFileAge) < 0) {
+ throw new IllegalArgumentException("The property minimumFileAge
must be non-negative");
+ }
+
+ if (maximumFileAge != null && Math.signum(maximumFileAge) < 0) {
+ throw new IllegalArgumentException("The property maximumFileAge
must be non-negative");
+ }
+
+ if (minimumSize != null && Math.signum(minimumSize) < 0) {
+ throw new IllegalArgumentException("The property minimumSize must
be non-negative");
+ }
+
+ if (maximumSize != null && Math.signum(maximumSize) < 0) {
+ throw new IllegalArgumentException("The property maximumSize must
be non-negative");
+ }
+
+ if (pollingInterval != null && pollingInterval <= 0) {
+ throw new IllegalArgumentException("The property pollingInterval
must be greater than zero");
+ }
+
+ if (numWorkers != null && numWorkers <= 0) {
+ throw new IllegalArgumentException("The property numWorkers must
be greater than zero");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
new file mode 100644
index 0000000..65153e3
--- /dev/null
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Worker thread that cleans up all the files that have been processed.
+ */
+public class ProcessedFileThread extends Thread {
+
+ private final BlockingQueue<File> recentlyProcessed;
+ private final boolean keepOriginal;
+
+ public ProcessedFileThread(FileSourceConfig fileConfig,
BlockingQueue<File> recentlyProcessed) {
+ keepOriginal =
Optional.ofNullable(fileConfig.getKeepFile()).orElse(false);
+ this.recentlyProcessed = recentlyProcessed;
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ File file = recentlyProcessed.take();
+ handle(file);
+ }
+ } catch (InterruptedException ie) {
+ // just terminate
+ }
+ }
+
+ private void handle(File f) {
+ if (!keepOriginal) {
+ try {
+ Files.deleteIfExists(f.toPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
new file mode 100644
index 0000000..0795343
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
\ No newline at end of file
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
new file mode 100644
index 0000000..aa38356
--- /dev/null
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PushbackInputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Helper class that provides helper methods for working with
+ * gzip-formatted files.
+ */
+public class GZipFiles {
+
+ /**
+ * Returns true if the given file is a gzip file.
+ */
+ @SuppressWarnings("deprecation")
+ public static boolean isGzip(File f) {
+
+ InputStream input = null;
+ try {
+ input = new FileInputStream(f);
+ PushbackInputStream pb = new PushbackInputStream(input, 2);
+ byte [] signature = new byte[2];
+ int len = pb.read(signature); //read the signature
+ pb.unread(signature, 0, len); //push back the signature to the
stream
+ // check if matches standard gzip magic number
+ return (signature[ 0 ] == (byte) 0x1f && signature[1] == (byte)
0x8b);
+ } catch (final Exception e) {
+ return false;
+ } finally {
+ IOUtils.closeQuietly(input);
+ }
+ }
+
+ /**
+ * Get a lazily loaded stream of lines from a gzipped file, similar to
+ * {@link Files#lines(java.nio.file.Path)}.
+ *
+ * @param path
+ * The path to the gzipped file.
+ * @return stream with lines.
+ */
+ public static Stream<String> lines(Path path) {
+ GZIPInputStream gzipStream = null;
+
+ try {
+ gzipStream = new GZIPInputStream(Files.newInputStream(path));
+ } catch (IOException e) {
+ closeSafely(gzipStream);
+ throw new UncheckedIOException(e);
+ }
+
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(gzipStream));
+ return reader.lines().onClose(() -> closeSafely(reader));
+ }
+
+ private static void closeSafely(Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
new file mode 100644
index 0000000..09cd0c2
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+import java.util.zip.ZipInputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Helper class that provides helper methods for working with
+ * zip-formatted files.
+ */
+public class ZipFiles {
+
+ /**
+ * Returns true if the given file is a gzip file.
+ */
+ @SuppressWarnings("deprecation")
+ public static boolean isZip(File f) {
+
+ InputStream input = null;
+ try {
+ DataInputStream in = new DataInputStream(new
BufferedInputStream(new FileInputStream(f)));
+ int test = in.readInt();
+ in.close();
+ return test == 0x504b0304;
+ } catch (final Exception e) {
+ return false;
+ } finally {
+ IOUtils.closeQuietly(input);
+ }
+ }
+
+ /**
+ * Get a lazily loaded stream of lines from a gzipped file, similar to
+ * {@link Files#lines(java.nio.file.Path)}.
+ *
+ * @param path
+ * The path to the zipped file.
+ * @return stream with lines.
+ */
+ public static Stream<String> lines(Path path) {
+ ZipInputStream zipStream = null;
+
+ try {
+ zipStream = new ZipInputStream(Files.newInputStream(path));
+ } catch (IOException e) {
+ closeSafely(zipStream);
+ throw new UncheckedIOException(e);
+ }
+ // Reader decoder = new InputStreamReader(gzipStream,
Charset.defaultCharset());
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(zipStream));
+ return reader.lines().onClose(() -> closeSafely(reader));
+ }
+
+ private static void closeSafely(Closeable closeable) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
new file mode 100644
index 0000000..2b21e91
--- /dev/null
+++
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
\ No newline at end of file
diff --git a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..df455c2
--- /dev/null
+++ b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: file
+description: Reads data from local filesystem
+sourceClass: org.apache.pulsar.io.file.FileSource
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
new file mode 100644
index 0000000..5934151
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class AbstractFileTests {
+
+ public static final String TMP_DIR = "/tmp/foo";
+
+ protected BlockingQueue<File> workQueue;
+ protected BlockingQueue<File> inProcess;
+ protected BlockingQueue<File> recentlyProcessed;
+ protected BlockingQueue<File> producedFiles;
+
+ protected TestFileGenerator generatorThread;
+ protected FileListingThread listingThread;
+ protected ExecutorService executor;
+
+ @BeforeMethod
+ public void init() throws IOException {
+
+ // Create the directory we are going to read from
+ Path directory = Paths.get(TMP_DIR);
+
+ if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
+ Files.createDirectory(directory, getPermissions());
+ }
+
+ workQueue = Mockito.spy(new LinkedBlockingQueue<>());
+ inProcess = Mockito.spy(new LinkedBlockingQueue<>());
+ recentlyProcessed = Mockito.spy(new LinkedBlockingQueue<>());
+ producedFiles = Mockito.spy(new LinkedBlockingQueue<>());
+ executor = Executors.newFixedThreadPool(10);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ // Shutdown all of the processing threads
+ stopThreads();
+
+ // Delete the directory and all the files
+ cleanUp();
+ }
+
+ protected static final void cleanUp() throws IOException {
+ Path directory = Paths.get(TMP_DIR);
+
+ if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
+ return;
+ }
+
+ Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException
exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ protected void stopThreads() throws Exception {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ }
+ }
+
+ protected final void generateFiles(int numFiles) throws IOException,
InterruptedException, ExecutionException {
+ generateFiles(numFiles, 1, TMP_DIR);
+ }
+
+ protected final void generateFiles(int numFiles, int numLines) throws
IOException, InterruptedException, ExecutionException {
+ generateFiles(numFiles, numLines, TMP_DIR);
+ }
+
+ protected final void generateFiles(int numFiles, int numLines, String
directory) throws IOException, InterruptedException, ExecutionException {
+ generatorThread = new TestFileGenerator(producedFiles, numFiles, 1,
numLines, directory, "prefix", ".txt", getPermissions());
+ Future<?> f = executor.submit(generatorThread);
+ f.get();
+ }
+
+ protected static final FileAttribute<Set<PosixFilePermission>>
getPermissions() {
+ Set<PosixFilePermission> perms =
PosixFilePermissions.fromString("rwxrwxrwx");
+ return PosixFilePermissions.asFileAttribute(perms);
+ }
+
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
new file mode 100644
index 0000000..a21633d
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("unchecked")
+public class FileConsumerThreadTests extends AbstractFileTests {
+
+ private PushSource<byte[]> consumer;
+ private FileConsumerThread consumerThread;
+
+ @Test
+ public final void singleFileTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+
+ try {
+ generateFiles(1);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ Thread.sleep(2000);
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ verify(workQueue, times(1)).offer(any(File.class));
+ verify(workQueue, atLeast(1)).take();
+ verify(inProcess, times(1)).add(any(File.class));
+ verify(inProcess, times(1)).remove(any(File.class));
+ verify(recentlyProcessed, times(1)).add(any(File.class));
+ verify(consumer, times(1)).consume((Record<byte[]>)
any(Record.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void mulitpleFileTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+
+ try {
+ generateFiles(50, 2);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ Thread.sleep(2000);
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ verify(workQueue, times(50)).offer(any(File.class));
+ verify(workQueue, atLeast(50)).take();
+ verify(inProcess, times(50)).add(any(File.class));
+ verify(inProcess, times(50)).remove(any(File.class));
+ verify(recentlyProcessed, times(50)).add(any(File.class));
+ verify(consumer, times(100)).consume((Record<byte[]>)
any(Record.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void multiLineFileTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+
+ try {
+ generateFiles(1, 10);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ Thread.sleep(2000);
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ verify(workQueue, times(1)).offer(any(File.class));
+ verify(workQueue, atLeast(1)).take();
+ verify(inProcess, times(1)).add(any(File.class));
+ verify(inProcess, times(1)).remove(any(File.class));
+ verify(recentlyProcessed, times(1)).add(any(File.class));
+ verify(consumer, times(10)).consume((Record<byte[]>)
any(Record.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
new file mode 100644
index 0000000..855498e
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.testng.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.testng.annotations.Test;
+
+
+public class FileListingThreadTests extends AbstractFileTests {
+
+ @Test
+ public final void singleFileTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+
+ try {
+ generateFiles(1);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(producedFiles, times(1)).put(any(File.class));
+ verify(workQueue, times(1)).offer(any(File.class));
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ }
+
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void fiftyFileTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+
+ try {
+ generateFiles(50);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(50)).offer(any(File.class));
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ }
+
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void minimumSizeTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+
+ try {
+ // Create 50 zero size files
+ generateFiles(50, 0);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(0)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void maximumSizeTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("maximumSize", "1000");
+
+ try {
+ // Create 5 files that exceed the limit and 45 that don't
+ generateFiles(5, 1000);
+ generateFiles(45, 10);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(45)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ } finally {
+ cleanUp();
+ }
+ }
+
+ @Test
+ public final void minimumAgeTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("minimumFileAge", "5000");
+
+ try {
+ // Create 5 files that will be too "new" for processing
+ generateFiles(5);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(0)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ } finally {
+ cleanUp();
+ }
+ }
+
+ @Test
+ public final void maximumAgeTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("maximumFileAge", "5000");
+
+ try {
+ // Create 5 files that will be processed
+ generateFiles(5);
+ Thread.sleep(5000);
+
+ // Create 5 files that will be too "old" for processing
+ generateFiles(5);
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(5)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ } finally {
+ cleanUp();
+ }
+ }
+
+ @Test
+ public final void doRecurseTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("recurse", Boolean.TRUE);
+
+ try {
+ // Create 5 files in the root folder
+ generateFiles(5);
+
+ // Create 5 files in a sub-folder
+ generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(10)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ } finally {
+ cleanUp();
+ }
+ }
+
+ @Test
+ public final void doNotRecurseTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("recurse", Boolean.FALSE);
+
+ try {
+ // Create 5 files in the root folder
+ generateFiles(5);
+
+ // Create 5 files in a sub-folder
+ generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(5)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ } finally {
+ cleanUp();
+ }
+ }
+
+ @Test
+ public final void pathFilterTest() throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("recurse", Boolean.TRUE);
+ map.put("pathFilter", "sub-.*");
+
+ try {
+ // Create 5 files in a sub-folder
+ generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir-a");
+ generateFiles(5, 1, TMP_DIR + File.separator + "dir-b");
+ listingThread = new FileListingThread(FileSourceConfig.load(map),
workQueue, inProcess, recentlyProcessed);
+ executor.execute(listingThread);
+ Thread.sleep(2000);
+ verify(workQueue, times(5)).offer(any(File.class));
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ } finally {
+ cleanUp();
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
new file mode 100644
index 0000000..64144e6
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.annotations.Test;
+
+public class FileSourceConfigTests {
+
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ FileSourceConfig config =
FileSourceConfig.load(yamlFile.getAbsolutePath());
+ assertNotNull(config);
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/tmp");
+ map.put("keepFile", false);
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/tmp");
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Required property not set.")
+ public final void missingRequiredPropertiesTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("pathFilter", "/");
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ config.validate();
+ }
+
+ @Test(expectedExceptions =
com.fasterxml.jackson.databind.exc.InvalidFormatException.class)
+ public final void InvalidBooleanPropertyTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/");
+ map.put("recurse", "not a boolean");
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "The property pollingInterval
must be greater than zero")
+ public final void ZeroValueTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/");
+ map.put("pollingInterval", 0);
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "The property minimumFileAge
must be non-negative")
+ public final void NegativeValueTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/");
+ map.put("minimumFileAge", "-50");
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Invalid Regex pattern provided
for fileFilter")
+ public final void invalidFileFilterTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", "/");
+ map.put("fileFilter", "\\"); // Results in a single '\' being sent.
+
+ FileSourceConfig config = FileSourceConfig.load(map);
+ assertNotNull(config);
+ config.validate();
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
new file mode 100644
index 0000000..98d35a9
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("unchecked")
+public class ProcessedFileThreadTests extends AbstractFileTests {
+
+ private PushSource<byte[]> consumer;
+ private FileConsumerThread consumerThread;
+ private ProcessedFileThread cleanupThread;
+ private FileSourceConfig fileConfig;
+
+ @Test
+ public final void singleFileTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("keepFile", Boolean.FALSE);
+
+ try {
+ generateFiles(1);
+ fileConfig = FileSourceConfig.load(map);
+ listingThread = new FileListingThread(fileConfig, workQueue,
inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ cleanupThread = new ProcessedFileThread(fileConfig,
recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ executor.execute(cleanupThread);
+ Thread.sleep(2000);
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ verify(workQueue, times(1)).offer(any(File.class));
+ verify(workQueue, atLeast(1)).take();
+ verify(inProcess, times(1)).add(any(File.class));
+ verify(inProcess, times(1)).remove(any(File.class));
+ verify(recentlyProcessed, times(1)).add(any(File.class));
+ verify(recentlyProcessed, times(2)).take();
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void mulitpleFileTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("keepFile", Boolean.FALSE);
+
+ try {
+ generateFiles(50);
+ fileConfig = FileSourceConfig.load(map);
+ listingThread = new FileListingThread(fileConfig, workQueue,
inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ cleanupThread = new ProcessedFileThread(fileConfig,
recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ executor.execute(cleanupThread);
+ Thread.sleep(2000);
+
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ verify(workQueue, times(50)).offer(any(File.class));
+ verify(workQueue, atLeast(50)).take();
+ verify(inProcess, times(50)).add(any(File.class));
+ verify(inProcess, times(50)).remove(any(File.class));
+ verify(recentlyProcessed, times(50)).add(any(File.class));
+ verify(recentlyProcessed, times(50)).add(any(File.class));
+ verify(recentlyProcessed, times(51)).take();
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void keepFileTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("keepFile", Boolean.TRUE);
+ map.put("pollingInterval", 1000L);
+
+ try {
+ generateFiles(1);
+ fileConfig = FileSourceConfig.load(map);
+ listingThread = new FileListingThread(fileConfig, workQueue,
inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ cleanupThread = new ProcessedFileThread(fileConfig,
recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ executor.execute(cleanupThread);
+ Thread.sleep(7900); // Should pull the same file 5 times?
+
+ for (File produced : producedFiles) {
+ verify(workQueue, atLeast(4)).offer(produced);
+ verify(inProcess, atLeast(4)).add(produced);
+ verify(inProcess, atLeast(4)).remove(produced);
+ verify(recentlyProcessed, atLeast(4)).add(produced);
+ }
+
+ verify(recentlyProcessed, atLeast(5)).take();
+ } catch (InterruptedException | ExecutionException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void continuousRunTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("keepFile", Boolean.FALSE);
+ map.put("pollingInterval", 100);
+ fileConfig = FileSourceConfig.load(map);
+
+ try {
+ // Start producing files, with a .1 sec delay between
+ generatorThread = new TestFileGenerator(producedFiles, 5000, 100,
1, TMP_DIR, "continuous", ".txt", getPermissions());
+ executor.execute(generatorThread);
+
+ listingThread = new FileListingThread(fileConfig, workQueue,
inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ cleanupThread = new ProcessedFileThread(fileConfig,
recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ executor.execute(cleanupThread);
+
+ // Run for 30 seconds
+ Thread.sleep(30000);
+
+ // Stop producing files
+ generatorThread.halt();
+
+ // Let the consumer catch up
+ while (!workQueue.isEmpty() && !inProcess.isEmpty() &&
!recentlyProcessed.isEmpty()) {
+ Thread.sleep(2000);
+ }
+
+ // Make sure every single file was processed.
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ } catch (InterruptedException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+
+ @Test
+ public final void multipleConsumerTest() throws IOException {
+
+ consumer = Mockito.mock(PushSource.class);
+ Mockito.doNothing().when(consumer).consume((Record<byte[]>)
any(Record.class));
+
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("inputDirectory", TMP_DIR);
+ map.put("keepFile", Boolean.FALSE);
+ map.put("pollingInterval", 100);
+ fileConfig = FileSourceConfig.load(map);
+
+ try {
+ // Start producing files, with a .1 sec delay between
+ generatorThread = new TestFileGenerator(producedFiles, 5000, 100,
1, TMP_DIR, "continuous", ".txt", getPermissions());
+ executor.execute(generatorThread);
+
+ listingThread = new FileListingThread(fileConfig, workQueue,
inProcess, recentlyProcessed);
+ consumerThread = new FileConsumerThread(consumer, workQueue,
inProcess, recentlyProcessed);
+ FileConsumerThread consumerThread2 = new
FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+ FileConsumerThread consumerThread3 = new
FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+ cleanupThread = new ProcessedFileThread(fileConfig,
recentlyProcessed);
+ executor.execute(listingThread);
+ executor.execute(consumerThread);
+ executor.execute(consumerThread2);
+ executor.execute(consumerThread3);
+ executor.execute(cleanupThread);
+
+ // Run for 30 seconds
+ Thread.sleep(30000);
+
+ // Stop producing files
+ generatorThread.halt();
+
+ // Let the consumer catch up
+ while (!workQueue.isEmpty() && !inProcess.isEmpty() &&
!recentlyProcessed.isEmpty()) {
+ Thread.sleep(2000);
+ }
+
+ // Make sure every single file was processed exactly once.
+ for (File produced : producedFiles) {
+ verify(workQueue, times(1)).offer(produced);
+ verify(inProcess, times(1)).add(produced);
+ verify(inProcess, times(1)).remove(produced);
+ verify(recentlyProcessed, times(1)).add(produced);
+ }
+
+ } catch (InterruptedException e) {
+ fail("Unable to generate files" + e.getLocalizedMessage());
+ }
+ }
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
new file mode 100644
index 0000000..21ad852
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+
+public class TestFileGenerator extends Thread {
+
+ // Allows us to communicate back which files we generated
+ private final BlockingQueue<File> producedFiles;
+ private final int numFiles;
+ private final long delay;
+ private final int numLines;
+ private final String prefix;
+ private final String suffix;
+ private final FileAttribute<?>[] attrs;
+ private final Path tempDir;
+ private boolean keepRunning = true;
+
+ public TestFileGenerator(BlockingQueue<File> producedFiles, int numFiles,
long delay, int numLines,
+ String dir, String prefix, String suffix, FileAttribute<?>...
attrs) throws IOException {
+ this.numFiles = numFiles;
+ this.delay = delay;
+ this.numLines = numLines;
+ this.producedFiles = producedFiles;
+ this.prefix = prefix;
+ this.suffix = suffix;
+ this.attrs = attrs;
+ tempDir = Files.createDirectories(Paths.get(dir), attrs);
+ }
+
+ public void run() {
+ int counter = 0;
+ while ( keepRunning && (counter++ < numFiles)) {
+ createFile();
+ try {
+ sleep(delay);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ public void halt() {
+ keepRunning = false;
+ }
+
+ private final void createFile() {
+ try {
+ Path path = Files.createTempFile(tempDir, prefix, suffix, attrs);
+ try(OutputStream out = Files.newOutputStream(path,
StandardOpenOption.APPEND)) {
+ for (int idx = 0; idx < numLines; idx++) {
+ IOUtils.write(RandomStringUtils.random(50, true, false) +
"\n", out, "UTF-8");
+ }
+ }
+
+ producedFiles.put(path.toFile());
+
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
new file mode 100644
index 0000000..b08bb00
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+import org.testng.annotations.Test;
+
+public class GZipFilesTests {
+
+ @Test
+ public final void validGzipFileTest() {
+
assertTrue(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/validGzip.gz")));
+ }
+
+ @Test
+ public final void nonGzipFileTest() {
+
assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt")));
+ }
+
+ @Test
+ public final void mislabelledGzipFileTest() {
+
assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/mislabelled.gz")));
+ }
+
+ @Test
+ public final void nonExistantGzipFileTest() {
+ assertFalse(GZipFiles.isGzip(null));
+ }
+
+ @Test
+ public final void streamGzipFileTest() {
+ Path path =
Paths.get(getFile("org/apache/pulsar/io/file/validGzip.gz").getAbsolutePath(),
"");
+
+ try (Stream<String> lines = GZipFiles.lines(path)) {
+ lines.forEachOrdered(line -> assertTrue(line.startsWith("Line ")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
new file mode 100644
index 0000000..2fc7286
--- /dev/null
+++
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+import org.testng.annotations.Test;
+
+public class ZipFilesTests {
+
+ @Test
+ public final void validZipFileTest() {
+
assertTrue(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/validZip.zip")));
+ }
+
+ @Test
+ public final void nonZipFileTest() {
+
assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt")));
+ }
+
+ @Test
+ public final void mislabelledZipFileTest() {
+
assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/mislabelled.gz")));
+ }
+
+ @Test
+ public final void nonExistantGzipFileTest() {
+ assertFalse(ZipFiles.isZip(null));
+ }
+
+ @Test
+ public final void streamZipFileTest() {
+ Path path =
Paths.get(getFile("org/apache/pulsar/io/file/validZip.zip").getAbsolutePath(),
"");
+
+ try (Stream<String> lines = ZipFiles.lines(path)) {
+ lines.forEachOrdered(line -> assertTrue(line.startsWith("Line ")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
new file mode 100644
index 0000000..529587e
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
@@ -0,0 +1 @@
+This file isn't gzipped.
\ No newline at end of file
diff --git
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
new file mode 100644
index 0000000..fbd35c6
--- /dev/null
+++
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+This file is not gzipped
\ No newline at end of file
diff --git
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz
new file mode 100644
index 0000000..f7d098d
Binary files /dev/null and
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz
differ
diff --git
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip
new file mode 100644
index 0000000..55c28e3
Binary files /dev/null and
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip
differ
diff --git a/pulsar-io/file/src/test/resources/sinkConfig.yaml
b/pulsar-io/file/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..554b0f9
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+ "inputDirectory": "/Users/david",
+ "recurse": true,
+ "keepFile": true,
+ "fileFilter": "[^\\.].*",
+ "pathFilter": "*",
+ "minimumFileAge": 0,
+ "maximumFileAge": 9999999999,
+ "minimumSize": 1,
+ "maximumSize": 5000000,
+ "ignoreHiddenFiles": true,
+ "pollingInterval": 5000,
+ "numWorkers": 1
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6503b66..628686b 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -48,6 +48,7 @@
<module>debezium</module>
<module>hdfs2</module>
<module>canal</module>
+ <module>file</module>
<module>netty</module>
</modules>