[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

This closes #2109


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48109104
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48109104
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48109104

Branch: refs/heads/flip-6
Commit: 4810910431e01bf143ae77a6e93a86f2fafbccd0
Parents: 259a3a5
Author: Ivan Mushketyk <ivan.mushke...@gmail.com>
Authored: Tue Jun 14 22:44:19 2016 +0100
Committer: Maximilian Michels <m...@apache.org>
Committed: Thu Aug 25 16:08:18 2016 +0200

----------------------------------------------------------------------
 flink-core/pom.xml                              |   7 +
 .../flink/api/common/io/FileInputFormat.java    |  20 ++-
 .../flink/api/common/io/FilePathFilter.java     |  69 ++++++++
 .../flink/api/common/io/GlobFilePathFilter.java | 111 ++++++++++++
 .../java/org/apache/flink/core/fs/Path.java     |  10 +-
 .../flink/core/fs/local/LocalFileStatus.java    |   8 +
 .../flink/api/common/io/DefaultFilterTest.java  |  70 ++++++++
 .../api/common/io/FileInputFormatTest.java      | 174 ++++++++++++-------
 .../api/common/io/GlobFilePathFilterTest.java   | 141 +++++++++++++++
 .../ContinuousFileMonitoringFunctionITCase.java |   4 +-
 .../hdfstests/ContinuousFileMonitoringTest.java |  13 +-
 .../environment/StreamExecutionEnvironment.java |  72 ++++++--
 .../ContinuousFileMonitoringFunction.java       |   8 +-
 .../api/functions/source/FilePathFilter.java    |  66 -------
 .../api/scala/StreamExecutionEnvironment.scala  |  43 ++++-
 ...ontinuousFileProcessingCheckpointITCase.java |   5 +-
 16 files changed, 650 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 9e290a0..dcb2599 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -103,6 +103,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 72d6061..d0f5166 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +71,7 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         * The fraction that the last split may be larger than the others.
         */
        private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-       
+
        /**
         * The timeout (in milliseconds) to wait for a filesystem stream to 
respond.
         */
@@ -218,7 +219,12 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         * structure is enabled.
         */
        protected boolean enumerateNestedFiles = false;
-       
+
+       /**
+        * Files filter for determining what files/directories should be 
included.
+        */
+       private FilePathFilter filesFilter = new GlobFilePathFilter();
+
        // 
--------------------------------------------------------------------------------------------
        //  Constructors
        // 
--------------------------------------------------------------------------------------------
 
@@ -332,6 +338,10 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
                return splitLength;
        }
 
+       public void setFilesFilter(FilePathFilter filesFilter) {
+               this.filesFilter = Preconditions.checkNotNull(filesFilter, 
"Files filter should not be null");
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Pre-flight: Configuration, Splits, Sampling
        // 
--------------------------------------------------------------------------------------------
@@ -625,7 +635,9 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         */
        protected boolean acceptFile(FileStatus fileStatus) {
                final String name = fileStatus.getPath().getName();
-               return !name.startsWith("_") && !name.startsWith(".");
+               return !name.startsWith("_")
+                       && !name.startsWith(".")
+                       && !filesFilter.filterPath(fileStatus.getPath());
        }
 
        /**
@@ -735,7 +747,7 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
                        "File Input (unknown file)" :
                        "File Input (" + this.filePath.toString() + ')';
        }
-       
+
        // 
============================================================================================
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
new file mode 100644
index 0000000..4ab896c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * The {@link #filterPath(Path)} method is responsible for deciding if a path 
is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files that
+ * are still being written.
+ */
+@PublicEvolving
+public abstract class FilePathFilter implements Serializable {
+
+       // Name of an unfinished Hadoop file
+       public static final String HADOOP_COPYING = "_COPYING_";
+
+       public static FilePathFilter createDefaultFilter() {
+               return new DefaultFilter();
+       }
+
+       /**
+        * Returns {@code true} if the {@code filePath} given is to be
+        * ignored when processing a directory, e.g.
+        * <pre>
+        * {@code
+        *
+        * public boolean filterPaths(Path filePath) {
+        *     return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
+        * }
+        * }</pre>
+        */
+       public abstract boolean filterPath(Path filePath);
+
+       /**
+        * The default file path filtering method and is used
+        * if no other such function is provided. This filter leaves out
+        * files starting with ".", "_", and "_COPYING_".
+        */
+       public static class DefaultFilter extends FilePathFilter {
+
+               DefaultFilter() {}
+
+               @Override
+               public boolean filterPath(Path filePath) {
+                       return filePath == null ||
+                               filePath.getName().startsWith(".") ||
+                               filePath.getName().startsWith("_") ||
+                               filePath.getName().contains(HADOOP_COPYING);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
new file mode 100644
index 0000000..4aaf481
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class for determining if a particular file should be included or excluded
+ * based on a set of include and exclude glob filters.
+ *
+ * Glob filter support the following expressions:
+ * <ul>
+ *     <li>* - matches any number of any characters including none</li>
+ *     <li>** - matches any file in all subdirectories</li>
+ *     <li>? - matches any single character</li>
+ *     <li>[abc] - matches one of the characters listed in a brackets</li>
+ *     <li>[a-z] - matches one character from the range given in the 
brackets</li>
+ * </ul>
+ *
+ * <p> If does not match an include pattern it is excluded. If it matches and 
include
+ * pattern but also matches an exclude pattern it is excluded.
+ *
+ * <p> If no patterns are provided all files are included
+ */
+@Internal
+public class GlobFilePathFilter extends FilePathFilter {
+
+       private static final long serialVersionUID = 1L;
+
+       private final List<PathMatcher> includeMatchers;
+       private final List<PathMatcher> excludeMatchers;
+
+       /**
+        * Constructor for GlobFilePathFilter that will match all files
+        */
+       public GlobFilePathFilter() {
+               this(Collections.<String>emptyList(), 
Collections.<String>emptyList());
+       }
+
+       /**
+        * Constructor for GlobFilePathFilter
+        *
+        * @param includePatterns glob patterns for files to include
+        * @param excludePatterns glob patterns for files to exclude
+        */
+       public GlobFilePathFilter(List<String> includePatterns, List<String> 
excludePatterns) {
+               includeMatchers = buildPatterns(includePatterns);
+               excludeMatchers = buildPatterns(excludePatterns);
+       }
+
+       private List<PathMatcher> buildPatterns(List<String> patterns) {
+               FileSystem fileSystem = FileSystems.getDefault();
+               List<PathMatcher> matchers = new ArrayList<>();
+
+               for (String patternStr : patterns) {
+                       matchers.add(fileSystem.getPathMatcher("glob:" + 
patternStr));
+               }
+
+               return matchers;
+       }
+
+       @Override
+       public boolean filterPath(Path filePath) {
+               if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) {
+                       return false;
+               }
+
+               for (PathMatcher mather : includeMatchers) {
+                       if (mather.matches(Paths.get(filePath.getPath()))) {
+                               return shouldExclude(filePath);
+                       }
+               }
+
+               return true;
+       }
+
+       private boolean shouldExclude(Path filePath) {
+               for (PathMatcher matcher : excludeMatchers) {
+                       if (matcher.matches(Paths.get(filePath.getPath()))) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
index 4c77199..7adfa42 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java
@@ -145,7 +145,7 @@ public class Path implements IOReadableWritable, 
Serializable {
        }
 
        /**
-        * Checks if the provided path string is either null or has zero length 
and throws
+        * Checks if the provided path string is either null or has zero length 
and throws
         * a {@link IllegalArgumentException} if any of the two conditions 
apply.
         * In addition, leading and tailing whitespaces are removed.
         *
@@ -333,6 +333,14 @@ public class Path implements IOReadableWritable, 
Serializable {
        }
 
        /**
+        * Return full path.
+        * @return full path
+        */
+       public String getPath() {
+               return uri.getPath();
+       }
+
+       /**
         * Returns the parent of a path, i.e., everything that precedes the 
last separator
         * or <code>null</code> if at root.
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 0aebd75..3e127ff 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -102,4 +102,12 @@ public class LocalFileStatus implements FileStatus {
        public File getFile() {
                return this.file;
        }
+
+       @Override
+       public String toString() {
+               return "LocalFileStatus{" +
+                       "file=" + file +
+                       ", path=" + path +
+                       '}';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
new file mode 100644
index 0000000..6956518
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+       @Parameters
+       public static Collection<Object[]> data() {
+               return Arrays.asList(new Object[][] {
+                       {"file.txt",                    false},
+
+                       {".file.txt",                   true},
+                       {"dir/.file.txt",               true},
+                       {".dir/file.txt",               false},
+
+                       {"_file.txt",                   true},
+                       {"dir/_file.txt",               true},
+                       {"_dir/file.txt",               false},
+
+                       // Check filtering Hadoop's unfinished files
+                       {FilePathFilter.HADOOP_COPYING,                 true},
+                       {"dir/" + FilePathFilter.HADOOP_COPYING,                
true},
+                       {FilePathFilter.HADOOP_COPYING + "/file.txt",   false},
+               });
+       }
+
+       private final boolean shouldFilter;
+       private final String filePath;
+
+       public DefaultFilterTest(String filePath, boolean shouldFilter) {
+               this.filePath = filePath;
+               this.shouldFilter = shouldFilter;
+       }
+
+       @Test
+       public void test() {
+               FilePathFilter defaultFilter = 
FilePathFilter.createDefaultFilter();
+               Path path = new Path(filePath);
+               assertEquals(
+                       String.format("File: %s", filePath),
+                       shouldFilter,
+                       defaultFilter.filterPath(path));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index ae8802b..dcd6583 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.io;
 
+import com.google.common.collect.Lists;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
@@ -27,16 +28,17 @@ import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.types.IntValue;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedOutputStream;
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.Collections;
 
 import static org.junit.Assert.*;
 
@@ -45,6 +47,9 @@ import static org.junit.Assert.*;
  */
 public class FileInputFormatTest {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        // 
------------------------------------------------------------------------
        //  Statistics
        // 
------------------------------------------------------------------------
@@ -257,41 +262,21 @@ public class FileInputFormatTest {
        public void testIgnoredUnderscoreFiles() {
                try {
                        final String contents = "CONTENTS";
-                       
+
                        // create some accepted, some ignored files
-                       
-                       File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-                       File f;
-                       do {
-                               f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-                       }
-                       while (f.exists());
 
-                       assertTrue(f.mkdirs());
-                       f.deleteOnExit();
-                       
-                       File child1 = new File(f, "dataFile1.txt");
-                       File child2 = new File(f, "another_file.bin");
-                       File luigiFile = new File(f, "_luigi");
-                       File success = new File(f, "_SUCCESS");
-                       
-                       File[] files = { child1, child2, luigiFile, success };
-                       
-                       for (File child : files) {
-                               child.deleteOnExit();
-                       
-                               BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-                               try { 
-                                       out.write(contents);
-                               } finally {
-                                       out.close();
-                               }
-                       }
-                       
+
+                       File child1 = temporaryFolder.newFile("dataFile1.txt");
+                       File child2 = 
temporaryFolder.newFile("another_file.bin");
+                       File luigiFile = temporaryFolder.newFile("_luigi");
+                       File success = temporaryFolder.newFile("_SUCCESS");
+
+                       createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
+
                        // test that only the valid files are accepted
                        
                        final DummyFileInputFormat format = new 
DummyFileInputFormat();
-                       format.setFilePath(f.toURI().toString());
+                       
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
                        format.configure(new Configuration());
                        FileInputSplit[] splits = format.createInputSplits(1);
                        
@@ -314,43 +299,95 @@ public class FileInputFormatTest {
        }
 
        @Test
+       public void testExcludeFiles() {
+               try {
+                       final String contents = "CONTENTS";
+
+                       // create some accepted, some ignored files
+
+                       File child1 = temporaryFolder.newFile("dataFile1.txt");
+                       File child2 = 
temporaryFolder.newFile("another_file.bin");
+
+                       File[] files = { child1, child2 };
+
+                       createTempFiles(contents.getBytes(), files);
+
+                       // test that only the valid files are accepted
+
+                       Configuration configuration = new Configuration();
+
+                       final DummyFileInputFormat format = new 
DummyFileInputFormat();
+                       
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+                       format.configure(configuration);
+                       format.setFilesFilter(new GlobFilePathFilter(
+                               Collections.singletonList("**"),
+                               
Collections.singletonList("**/another_file.bin")));
+                       FileInputSplit[] splits = format.createInputSplits(1);
+
+                       Assert.assertEquals(1, splits.length);
+
+                       final URI uri1 = splits[0].getPath().toUri();
+
+                       final URI childUri1 = child1.toURI();
+
+                       Assert.assertEquals(uri1, childUri1);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testReadMultiplePatterns() {
+               try {
+                       final String contents = "CONTENTS";
+
+                       // create some accepted, some ignored files
+
+                       File child1 = temporaryFolder.newFile("dataFile1.txt");
+                       File child2 = 
temporaryFolder.newFile("another_file.bin");
+                       createTempFiles(contents.getBytes(), child1, child2);
+
+                       // test that only the valid files are accepted
+
+                       Configuration configuration = new Configuration();
+
+                       final DummyFileInputFormat format = new 
DummyFileInputFormat();
+                       
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+                       format.configure(configuration);
+                       format.setFilesFilter(new GlobFilePathFilter(
+                               Collections.singletonList("**"),
+                               Lists.newArrayList("**/another_file.bin", 
"**/dataFile1.txt")
+                       ));
+                       FileInputSplit[] splits = format.createInputSplits(1);
+
+                       Assert.assertEquals(0, splits.length);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       Assert.fail(e.getMessage());
+               }
+       }
+
+       @Test
        public void testGetStatsIgnoredUnderscoreFiles() {
                try {
-                       final long SIZE = 2048;
+                       final int SIZE = 2048;
                        final long TOTAL = 2*SIZE;
 
                        // create two accepted and two ignored files
-                       File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-                       File f;
-                       do {
-                               f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-                       }
-                       while (f.exists());
-                       
-                       assertTrue(f.mkdirs());
-                       f.deleteOnExit();
-
-                       File child1 = new File(f, "dataFile1.txt");
-                       File child2 = new File(f, "another_file.bin");
-                       File luigiFile = new File(f, "_luigi");
-                       File success = new File(f, "_SUCCESS");
-
-                       File[] files = { child1, child2, luigiFile, success };
+                       File child1 = temporaryFolder.newFile("dataFile1.txt");
+                       File child2 = 
temporaryFolder.newFile("another_file.bin");
+                       File luigiFile = temporaryFolder.newFile("_luigi");
+                       File success = temporaryFolder.newFile("_SUCCESS");
 
-                       for (File child : files) {
-                               child.deleteOnExit();
+                       createTempFiles(new byte[SIZE], child1, child2, 
luigiFile, success);
 
-                               BufferedOutputStream out = new 
BufferedOutputStream(new FileOutputStream(child));
-                               try {
-                                       for (long bytes = SIZE; bytes > 0; 
bytes--) {
-                                               out.write(0);
-                                       }
-                               } finally {
-                                       out.close();
-                               }
-                       }
                        final DummyFileInputFormat format = new 
DummyFileInputFormat();
-                       format.setFilePath(f.toURI().toString());
+                       
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
                        format.configure(new Configuration());
 
                        // check that only valid files are used for statistics 
computation
@@ -406,7 +443,20 @@ public class FileInputFormatTest {
        }
        
        // 
------------------------------------------------------------------------
-       
+
+       private void createTempFiles(byte[] contents, File... files) throws 
IOException {
+               for (File child : files) {
+                       child.deleteOnExit();
+
+                       BufferedOutputStream out = new BufferedOutputStream(new 
FileOutputStream(child));
+                       try {
+                               out.write(contents);
+                       } finally {
+                               out.close();
+                       }
+               }
+       }
+
        private class DummyFileInputFormat extends FileInputFormat<IntValue> {
                private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
new file mode 100644
index 0000000..bced076
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+       @Test
+       public void defaultConstructorCreateMatchAllFilter() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter();
+               assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+       }
+
+       @Test
+       public void matchAllFilesByDefault() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.<String>emptyList(),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+       }
+
+       @Test
+       public void excludeFilesNotInIncludePatterns() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("dir/*"),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+               assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+       }
+
+       @Test
+       public void excludeFilesIfMatchesExclude() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("dir/*"),
+                       Collections.singletonList("dir/file.txt"));
+
+               assertTrue(matcher.filterPath(new Path("dir/file.txt")));
+       }
+
+       @Test
+       public void includeFileWithAnyCharacterMatcher() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("dir/?.txt"),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+               assertTrue(matcher.filterPath(new Path("dir/aa.txt")));
+       }
+
+       @Test
+       public void includeFileWithCharacterSetMatcher() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("dir/[acd].txt"),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+               assertFalse(matcher.filterPath(new Path("dir/c.txt")));
+               assertFalse(matcher.filterPath(new Path("dir/d.txt")));
+               assertTrue(matcher.filterPath(new Path("dir/z.txt")));
+       }
+
+       @Test
+       public void includeFileWithCharacterRangeMatcher() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("dir/[a-d].txt"),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("dir/a.txt")));
+               assertFalse(matcher.filterPath(new Path("dir/b.txt")));
+               assertFalse(matcher.filterPath(new Path("dir/c.txt")));
+               assertFalse(matcher.filterPath(new Path("dir/d.txt")));
+               assertTrue(matcher.filterPath(new Path("dir/z.txt")));
+       }
+
+       @Test
+       public void excludeHDFSFile() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("**"),
+                       Collections.singletonList("/dir/file2.txt"));
+
+               assertFalse(matcher.filterPath(new 
Path("hdfs:///dir/file1.txt")));
+               assertTrue(matcher.filterPath(new 
Path("hdfs:///dir/file2.txt")));
+               assertFalse(matcher.filterPath(new 
Path("hdfs:///dir/file3.txt")));
+       }
+
+       @Test
+       public void excludeFilenameWithStart() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("**"),
+                       Collections.singletonList("\\*"));
+
+               assertTrue(matcher.filterPath(new Path("*")));
+               assertFalse(matcher.filterPath(new Path("**")));
+               assertFalse(matcher.filterPath(new Path("other.txt")));
+       }
+
+       @Test
+       public void singleStarPattern() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("*"),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("a")));
+               assertTrue(matcher.filterPath(new Path("a/b")));
+               assertTrue(matcher.filterPath(new Path("a/b/c")));
+       }
+
+       @Test
+       public void doubleStarPattern() {
+               GlobFilePathFilter matcher = new GlobFilePathFilter(
+                       Collections.singletonList("**"),
+                       Collections.<String>emptyList());
+
+               assertFalse(matcher.filterPath(new Path("a")));
+               assertFalse(matcher.filterPath(new Path("a/b")));
+               assertFalse(matcher.filterPath(new Path("a/b/c")));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index e6cd5d9..663345c 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -122,9 +122,9 @@ public class ContinuousFileMonitoringFunctionITCase extends 
StreamingProgramTest
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                        env.setParallelism(1);
 
+                       
format.setFilesFilter(FilePathFilter.createDefaultFilter());
                        ContinuousFileMonitoringFunction<String> 
monitoringFunction =
                                new ContinuousFileMonitoringFunction<>(format, 
hdfsURI,
-                                       FilePathFilter.createDefaultFilter(),
                                        FileProcessingMode.PROCESS_CONTINUOUSLY,
                                        env.getParallelism(), INTERVAL);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index def9378..4aadaec 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -216,8 +216,9 @@ public class ContinuousFileMonitoringTest {
                }
 
                TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               format.setFilesFilter(new PathFilter());
                ContinuousFileMonitoringFunction<String> monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, hdfsURI, 
new PathFilter(),
+                       new ContinuousFileMonitoringFunction<>(format, hdfsURI,
                                FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
                monitoringFunction.open(new Configuration());
@@ -242,8 +243,9 @@ public class ContinuousFileMonitoringTest {
                fc.start();
 
                TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               format.setFilesFilter(FilePathFilter.createDefaultFilter());
                ContinuousFileMonitoringFunction<String> monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, hdfsURI, 
FilePathFilter.createDefaultFilter(),
+                       new ContinuousFileMonitoringFunction<>(format, hdfsURI,
                                FileProcessingMode.PROCESS_CONTINUOUSLY, 1, 
INTERVAL);
 
                monitoringFunction.open(new Configuration());
@@ -291,8 +293,9 @@ public class ContinuousFileMonitoringTest {
                Assert.assertTrue(fc.getFilesCreated().size() >= 1);
 
                TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+               format.setFilesFilter(FilePathFilter.createDefaultFilter());
                ContinuousFileMonitoringFunction<String> monitoringFunction =
-                       new ContinuousFileMonitoringFunction<>(format, hdfsURI, 
FilePathFilter.createDefaultFilter(),
+                       new ContinuousFileMonitoringFunction<>(format, hdfsURI,
                                FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
 
                monitoringFunction.open(new Configuration());
@@ -427,7 +430,7 @@ public class ContinuousFileMonitoringTest {
                assert (hdfs != null);
 
                org.apache.hadoop.fs.Path file = new 
org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-               Assert.assertTrue (!hdfs.exists(file));
+               Assert.assertFalse(hdfs.exists(file));
 
                org.apache.hadoop.fs.Path tmp = new 
org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
                FSDataOutputStream stream = hdfs.create(tmp);

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 1913a36..ead9564 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -52,7 +52,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
@@ -917,11 +917,11 @@ public abstract class StreamExecutionEnvironment {
                Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
 
                TextInputFormat format = new TextInputFormat(new 
Path(filePath));
+               format.setFilesFilter(FilePathFilter.createDefaultFilter());
                TypeInformation<String> typeInfo = 
BasicTypeInfo.STRING_TYPE_INFO;
                format.setCharsetName(charsetName);
 
-               return readFile(format, filePath, 
FileProcessingMode.PROCESS_ONCE, -1,
-                       FilePathFilter.createDefaultFilter(), typeInfo);
+               return readFile(format, filePath, 
FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
        }
 
        /**
@@ -952,7 +952,52 @@ public abstract class StreamExecutionEnvironment {
         */
        public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
                                                                                
                String filePath) {
-               return readFile(inputFormat, filePath, 
FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter());
+               return readFile(inputFormat, filePath, 
FileProcessingMode.PROCESS_ONCE, -1);
+       }
+
+       /**
+        *
+        * Reads the contents of the user-specified {@code filePath} based on 
the given {@link FileInputFormat}. Depending
+        * on the provided {@link FileProcessingMode}.
+        * <p>
+        * See {@link #readFile(FileInputFormat, String, FileProcessingMode, 
long)}
+        *
+        * @param inputFormat
+        *              The input format used to create the data stream
+        * @param filePath
+        *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
+        * @param watchType
+        *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
+        * @param interval
+        *              In the case of periodic path monitoring, this specifies 
the interval (in millis) between consecutive path scans
+        * @param filter
+        *              The files to be excluded from the processing
+        * @param <OUT>
+        *              The type of the returned data stream
+        * @return The data stream that represents the data read from the given 
file
+        *
+        * @deprecated Use {@link 
FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and
+        *              {@link 
StreamExecutionEnvironment#readFile(FileInputFormat, String, 
FileProcessingMode, long)}
+        *
+        */
+       @PublicEvolving
+       @Deprecated
+       public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
+                                                                               
                String filePath,
+                                                                               
                FileProcessingMode watchType,
+                                                                               
                long interval,
+                                                                               
                FilePathFilter filter) {
+               inputFormat.setFilesFilter(filter);
+
+               TypeInformation<OUT> typeInformation;
+               try {
+                       typeInformation = 
TypeExtractor.getInputFormatTypes(inputFormat);
+               } catch (Exception e) {
+                       throw new InvalidProgramException("The type returned by 
the input format could not be " +
+                               "automatically determined. Please specify the 
TypeInformation of the produced type " +
+                               "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
+               }
+               return readFile(inputFormat, filePath, watchType, interval, 
typeInformation);
        }
 
        /**
@@ -986,8 +1031,6 @@ public abstract class StreamExecutionEnvironment {
         *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
         * @param interval
         *              In the case of periodic path monitoring, this specifies 
the interval (in millis) between consecutive path scans
-        * @param filter
-        *              The files to be excluded from the processing
         * @param <OUT>
         *              The type of the returned data stream
         * @return The data stream that represents the data read from the given 
file
@@ -996,8 +1039,7 @@ public abstract class StreamExecutionEnvironment {
        public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
                                                                                
                String filePath,
                                                                                
                FileProcessingMode watchType,
-                                                                               
                long interval,
-                                                                               
                FilePathFilter filter) {
+                                                                               
                long interval) {
 
                TypeInformation<OUT> typeInformation;
                try {
@@ -1007,7 +1049,7 @@ public abstract class StreamExecutionEnvironment {
                                "automatically determined. Please specify the 
TypeInformation of the produced type " +
                                "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
                }
-               return readFile(inputFormat, filePath, watchType, interval, 
filter, typeInformation);
+               return readFile(inputFormat, filePath, watchType, interval, 
typeInformation);
        }
 
        /**
@@ -1057,8 +1099,6 @@ public abstract class StreamExecutionEnvironment {
         *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
         * @param watchType
         *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
-        * @param filter
-        *              The files to be excluded from the processing
         * @param typeInformation
         *              Information on the type of the elements in the output 
stream
         * @param interval
@@ -1072,7 +1112,6 @@ public abstract class StreamExecutionEnvironment {
                                                                                
                String filePath,
                                                                                
                FileProcessingMode watchType,
                                                                                
                long interval,
-                                                                               
                FilePathFilter filter,
                                                                                
                TypeInformation<OUT> typeInformation) {
 
                Preconditions.checkNotNull(inputFormat, "InputFormat must not 
be null.");
@@ -1080,7 +1119,7 @@ public abstract class StreamExecutionEnvironment {
                Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
 
                inputFormat.setFilePath(filePath);
-               return createFileInput(inputFormat, typeInformation, "Custom 
File Source", watchType, filter, interval);
+               return createFileInput(inputFormat, typeInformation, "Custom 
File Source", watchType, interval);
        }
 
        /**
@@ -1250,8 +1289,7 @@ public abstract class StreamExecutionEnvironment {
                if (inputFormat instanceof FileInputFormat) {
                        FileInputFormat<OUT> format = (FileInputFormat<OUT>) 
inputFormat;
                        source = createFileInput(format, typeInfo, "Custom File 
source",
-                               FileProcessingMode.PROCESS_ONCE,
-                               FilePathFilter.createDefaultFilter(),  -1);
+                               FileProcessingMode.PROCESS_ONCE, -1);
                } else {
                        source = createInput(inputFormat, typeInfo, "Custom 
Source");
                }
@@ -1270,14 +1308,12 @@ public abstract class StreamExecutionEnvironment {
                                                                                
                                TypeInformation<OUT> typeInfo,
                                                                                
                                String sourceName,
                                                                                
                                FileProcessingMode monitoringMode,
-                                                                               
                                FilePathFilter pathFilter,
                                                                                
                                long interval) {
 
                Preconditions.checkNotNull(inputFormat, "Unspecified file input 
format.");
                Preconditions.checkNotNull(typeInfo, "Unspecified output type 
information.");
                Preconditions.checkNotNull(sourceName, "Unspecified name for 
the source.");
                Preconditions.checkNotNull(monitoringMode, "Unspecified 
monitoring mode.");
-               Preconditions.checkNotNull(pathFilter, "Unspecified path name 
filtering function.");
 
                
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
 ||
                        interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
@@ -1286,7 +1322,7 @@ public abstract class StreamExecutionEnvironment {
 
                ContinuousFileMonitoringFunction<OUT> monitoringFunction = new 
ContinuousFileMonitoringFunction<>(
                        inputFormat, inputFormat.getFilePath().toString(),
-                       pathFilter, monitoringMode, getParallelism(), interval);
+                       monitoringMode, getParallelism(), interval);
 
                ContinuousFileReaderOperator<OUT, ?> reader = new 
ContinuousFileReaderOperator<>(inputFormat);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8ff4a2a..d36daab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -81,15 +82,13 @@ public class ContinuousFileMonitoringFunction<OUT>
 
        private Long globalModificationTime;
 
-       private FilePathFilter pathFilter;
-
        private transient Object checkpointLock;
 
        private volatile boolean isRunning = true;
 
        public ContinuousFileMonitoringFunction(
                FileInputFormat<OUT> format, String path,
-               FilePathFilter filter, FileProcessingMode watchType,
+               FileProcessingMode watchType,
                int readerParallelism, long interval) {
 
                if (watchType != FileProcessingMode.PROCESS_ONCE && interval < 
MIN_MONITORING_INTERVAL) {
@@ -98,7 +97,6 @@ public class ContinuousFileMonitoringFunction<OUT>
                }
                this.format = Preconditions.checkNotNull(format, "Unspecified 
File Input Format.");
                this.path = Preconditions.checkNotNull(path, "Unspecified 
Path.");
-               this.pathFilter = Preconditions.checkNotNull(filter, 
"Unspecified File Path Filter.");
 
                this.interval = interval;
                this.watchType = watchType;
@@ -274,7 +272,7 @@ public class ContinuousFileMonitoringFunction<OUT>
         */
        private boolean shouldIgnore(Path filePath, long modificationTime) {
                assert (Thread.holdsLock(checkpointLock));
-               boolean shouldIgnore = ((pathFilter != null && 
pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
+               boolean shouldIgnore = modificationTime <= 
globalModificationTime;
                if (shouldIgnore) {
                        LOG.debug("Ignoring " + filePath + ", with mod time= " 
+ modificationTime + " and global mod time= " + globalModificationTime);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
deleted file mode 100644
index 1a359ab..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * An interface to be implemented by the user when using the {@link 
ContinuousFileMonitoringFunction}.
- * The {@link #filterPath(Path)} method is responsible for deciding if a path 
is eligible for further
- * processing or not. This can serve to exclude temporary or partial files that
- * are still being written.
- */
-@PublicEvolving
-public abstract class FilePathFilter implements Serializable {
-
-       public static FilePathFilter createDefaultFilter() {
-               return new DefaultFilter();
-       }
-       /**
-        * Returns {@code true} if the {@code filePath} given is to be
-        * ignored when processing a directory, e.g.
-        * <pre>
-        * {@code
-        *
-        * public boolean filterPaths(Path filePath) {
-        *     return filePath.getName().startsWith(".") || 
filePath.getName().contains("_COPYING_");
-        * }
-        * }</pre>
-        */
-       public abstract boolean filterPath(Path filePath);
-
-       /**
-        * The default file path filtering method and is used
-        * if no other such function is provided. This filter leaves out
-        * files starting with ".", "_", and "_COPYING_".
-        */
-       public static class DefaultFilter extends FilePathFilter {
-
-               DefaultFilter() {}
-
-               @Override
-               public boolean filterPath(Path filePath) {
-                       return filePath == null ||
-                               filePath.getName().startsWith(".") ||
-                               filePath.getName().startsWith("_") ||
-                               filePath.getName().contains("_COPYING_");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index f6dab1e..9cb36a5 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
-import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
+import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, 
InputFormat}
 import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -467,6 +467,40 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
     * Reads the contents of the user-specified path based on the given 
[[FileInputFormat]].
+    * Depending on the provided [[FileProcessingMode]].
+    *
+    * @param inputFormat
+    *          The input format used to create the data stream
+    * @param filePath
+    *          The path of the file, as a URI (e.g., "file:///some/local/file" 
or
+    *          "hdfs://host:port/file/path")
+    * @param watchType
+    *          The mode in which the source should operate, i.e. monitor path 
and react
+    *          to new data, or process once and exit
+    * @param interval
+    *          In the case of periodic path monitoring, this specifies the 
interval (in millis)
+    *          between consecutive path scans
+    * @param filter
+    *          The files to be excluded from the processing
+    * @return The data stream that represents the data read from the given file
+    *
+    * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} 
to set a filter and
+    *         {@link StreamExecutionEnvironment#readFile(FileInputFormat,
+      *              String, FileProcessingMode, long)}
+    */
+  @PublicEvolving
+  @Deprecated
+  def readFile[T: TypeInformation](
+                                    inputFormat: FileInputFormat[T],
+                                    filePath: String,
+                                    watchType: FileProcessingMode,
+                                    interval: Long,
+                                    filter: FilePathFilter): DataStream[T] = {
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, 
filter))
+  }
+
+  /**
+    * Reads the contents of the user-specified path based on the given 
[[FileInputFormat]].
     * Depending on the provided [[FileProcessingMode]], the source
     * may periodically monitor (every `interval` ms) the path for new data
     * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
@@ -496,8 +530,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     * @param interval
     *          In the case of periodic path monitoring, this specifies the 
interval (in millis)
     *          between consecutive path scans
-    * @param filter
-    *          The files to be excluded from the processing
     * @return The data stream that represents the data read from the given file
     */
   @PublicEvolving
@@ -505,10 +537,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
       inputFormat: FileInputFormat[T],
       filePath: String,
       watchType: FileProcessingMode,
-      interval: Long,
-      filter: FilePathFilter): DataStream[T] = {
+      interval: Long): DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
-    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, 
filter, typeInfo))
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, 
typeInfo))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index d540a92..a265c0a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
@@ -112,8 +112,9 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                // create the monitoring source along with the necessary 
readers.
                TestingSinkFunction sink = new TestingSinkFunction();
                TextInputFormat format = new TextInputFormat(new 
org.apache.flink.core.fs.Path(localFsURI));
+               format.setFilesFilter(FilePathFilter.createDefaultFilter());
                DataStream<String> inputStream = env.readFile(format, 
localFsURI,
-                       FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, 
FilePathFilter.createDefaultFilter());
+                       FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
 
                inputStream.flatMap(new FlatMapFunction<String, String>() {
                        @Override

Reply via email to