[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns
This closes #2109 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48109104 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48109104 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48109104 Branch: refs/heads/flip-6 Commit: 4810910431e01bf143ae77a6e93a86f2fafbccd0 Parents: 259a3a5 Author: Ivan Mushketyk <ivan.mushke...@gmail.com> Authored: Tue Jun 14 22:44:19 2016 +0100 Committer: Maximilian Michels <m...@apache.org> Committed: Thu Aug 25 16:08:18 2016 +0200 ---------------------------------------------------------------------- flink-core/pom.xml | 7 + .../flink/api/common/io/FileInputFormat.java | 20 ++- .../flink/api/common/io/FilePathFilter.java | 69 ++++++++ .../flink/api/common/io/GlobFilePathFilter.java | 111 ++++++++++++ .../java/org/apache/flink/core/fs/Path.java | 10 +- .../flink/core/fs/local/LocalFileStatus.java | 8 + .../flink/api/common/io/DefaultFilterTest.java | 70 ++++++++ .../api/common/io/FileInputFormatTest.java | 174 ++++++++++++------- .../api/common/io/GlobFilePathFilterTest.java | 141 +++++++++++++++ .../ContinuousFileMonitoringFunctionITCase.java | 4 +- .../hdfstests/ContinuousFileMonitoringTest.java | 13 +- .../environment/StreamExecutionEnvironment.java | 72 ++++++-- .../ContinuousFileMonitoringFunction.java | 8 +- .../api/functions/source/FilePathFilter.java | 66 ------- .../api/scala/StreamExecutionEnvironment.scala | 43 ++++- ...ontinuousFileProcessingCheckpointITCase.java | 5 +- 16 files changed, 650 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 9e290a0..dcb2599 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -103,6 +103,13 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 72d6061..d0f5166 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -33,6 +33,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS * The fraction that the last split may be larger than the others. */ private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; - + /** * The timeout (in milliseconds) to wait for a filesystem stream to respond. */ @@ -218,7 +219,12 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS * structure is enabled. */ protected boolean enumerateNestedFiles = false; - + + /** + * Files filter for determining what files/directories should be included. + */ + private FilePathFilter filesFilter = new GlobFilePathFilter(); + // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -332,6 +338,10 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS return splitLength; } + public void setFilesFilter(FilePathFilter filesFilter) { + this.filesFilter = Preconditions.checkNotNull(filesFilter, "Files filter should not be null"); + } + // -------------------------------------------------------------------------------------------- // Pre-flight: Configuration, Splits, Sampling // -------------------------------------------------------------------------------------------- @@ -625,7 +635,9 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS */ protected boolean acceptFile(FileStatus fileStatus) { final String name = fileStatus.getPath().getName(); - return !name.startsWith("_") && !name.startsWith("."); + return !name.startsWith("_") + && !name.startsWith(".") + && !filesFilter.filterPath(fileStatus.getPath()); } /** @@ -735,7 +747,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS "File Input (unknown file)" : "File Input (" + this.filePath.toString() + ')'; } - + // ============================================================================================ /** http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java new file mode 100644 index 0000000..4ab896c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FilePathFilter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.core.fs.Path; + +import java.io.Serializable; + +/** + * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further + * processing or not. This can serve to exclude temporary or partial files that + * are still being written. + */ +@PublicEvolving +public abstract class FilePathFilter implements Serializable { + + // Name of an unfinished Hadoop file + public static final String HADOOP_COPYING = "_COPYING_"; + + public static FilePathFilter createDefaultFilter() { + return new DefaultFilter(); + } + + /** + * Returns {@code true} if the {@code filePath} given is to be + * ignored when processing a directory, e.g. + * <pre> + * {@code + * + * public boolean filterPaths(Path filePath) { + * return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_"); + * } + * }</pre> + */ + public abstract boolean filterPath(Path filePath); + + /** + * The default file path filtering method and is used + * if no other such function is provided. This filter leaves out + * files starting with ".", "_", and "_COPYING_". + */ + public static class DefaultFilter extends FilePathFilter { + + DefaultFilter() {} + + @Override + public boolean filterPath(Path filePath) { + return filePath == null || + filePath.getName().startsWith(".") || + filePath.getName().startsWith("_") || + filePath.getName().contains(HADOOP_COPYING); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java new file mode 100644 index 0000000..4aaf481 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GlobFilePathFilter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.Path; + +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Class for determining if a particular file should be included or excluded + * based on a set of include and exclude glob filters. + * + * Glob filter support the following expressions: + * <ul> + * <li>* - matches any number of any characters including none</li> + * <li>** - matches any file in all subdirectories</li> + * <li>? - matches any single character</li> + * <li>[abc] - matches one of the characters listed in a brackets</li> + * <li>[a-z] - matches one character from the range given in the brackets</li> + * </ul> + * + * <p> If does not match an include pattern it is excluded. If it matches and include + * pattern but also matches an exclude pattern it is excluded. + * + * <p> If no patterns are provided all files are included + */ +@Internal +public class GlobFilePathFilter extends FilePathFilter { + + private static final long serialVersionUID = 1L; + + private final List<PathMatcher> includeMatchers; + private final List<PathMatcher> excludeMatchers; + + /** + * Constructor for GlobFilePathFilter that will match all files + */ + public GlobFilePathFilter() { + this(Collections.<String>emptyList(), Collections.<String>emptyList()); + } + + /** + * Constructor for GlobFilePathFilter + * + * @param includePatterns glob patterns for files to include + * @param excludePatterns glob patterns for files to exclude + */ + public GlobFilePathFilter(List<String> includePatterns, List<String> excludePatterns) { + includeMatchers = buildPatterns(includePatterns); + excludeMatchers = buildPatterns(excludePatterns); + } + + private List<PathMatcher> buildPatterns(List<String> patterns) { + FileSystem fileSystem = FileSystems.getDefault(); + List<PathMatcher> matchers = new ArrayList<>(); + + for (String patternStr : patterns) { + matchers.add(fileSystem.getPathMatcher("glob:" + patternStr)); + } + + return matchers; + } + + @Override + public boolean filterPath(Path filePath) { + if (includeMatchers.isEmpty() && excludeMatchers.isEmpty()) { + return false; + } + + for (PathMatcher mather : includeMatchers) { + if (mather.matches(Paths.get(filePath.getPath()))) { + return shouldExclude(filePath); + } + } + + return true; + } + + private boolean shouldExclude(Path filePath) { + for (PathMatcher matcher : excludeMatchers) { + if (matcher.matches(Paths.get(filePath.getPath()))) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/Path.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java index 4c77199..7adfa42 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/Path.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/Path.java @@ -145,7 +145,7 @@ public class Path implements IOReadableWritable, Serializable { } /** - * Checks if the provided path string is either null or has zero length and throws + * Checks if the provided path string is either null or has zero length and throws * a {@link IllegalArgumentException} if any of the two conditions apply. * In addition, leading and tailing whitespaces are removed. * @@ -333,6 +333,14 @@ public class Path implements IOReadableWritable, Serializable { } /** + * Return full path. + * @return full path + */ + public String getPath() { + return uri.getPath(); + } + + /** * Returns the parent of a path, i.e., everything that precedes the last separator * or <code>null</code> if at root. * http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java index 0aebd75..3e127ff 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java @@ -102,4 +102,12 @@ public class LocalFileStatus implements FileStatus { public File getFile() { return this.file; } + + @Override + public String toString() { + return "LocalFileStatus{" + + "file=" + file + + ", path=" + path + + '}'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java new file mode 100644 index 0000000..6956518 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.io; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.flink.core.fs.Path; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class DefaultFilterTest { + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {"file.txt", false}, + + {".file.txt", true}, + {"dir/.file.txt", true}, + {".dir/file.txt", false}, + + {"_file.txt", true}, + {"dir/_file.txt", true}, + {"_dir/file.txt", false}, + + // Check filtering Hadoop's unfinished files + {FilePathFilter.HADOOP_COPYING, true}, + {"dir/" + FilePathFilter.HADOOP_COPYING, true}, + {FilePathFilter.HADOOP_COPYING + "/file.txt", false}, + }); + } + + private final boolean shouldFilter; + private final String filePath; + + public DefaultFilterTest(String filePath, boolean shouldFilter) { + this.filePath = filePath; + this.shouldFilter = shouldFilter; + } + + @Test + public void test() { + FilePathFilter defaultFilter = FilePathFilter.createDefaultFilter(); + Path path = new Path(filePath); + assertEquals( + String.format("File: %s", filePath), + shouldFilter, + defaultFilter.filterPath(path)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index ae8802b..dcd6583 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.io; +import com.google.common.collect.Lists; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; @@ -27,16 +28,17 @@ import org.apache.flink.testutils.TestFileUtils; import org.apache.flink.types.IntValue; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.BufferedOutputStream; -import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.Collections; import static org.junit.Assert.*; @@ -45,6 +47,9 @@ import static org.junit.Assert.*; */ public class FileInputFormatTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + // ------------------------------------------------------------------------ // Statistics // ------------------------------------------------------------------------ @@ -257,41 +262,21 @@ public class FileInputFormatTest { public void testIgnoredUnderscoreFiles() { try { final String contents = "CONTENTS"; - + // create some accepted, some ignored files - - File tempDir = new File(System.getProperty("java.io.tmpdir")); - File f; - do { - f = new File(tempDir, TestFileUtils.randomFileName("")); - } - while (f.exists()); - assertTrue(f.mkdirs()); - f.deleteOnExit(); - - File child1 = new File(f, "dataFile1.txt"); - File child2 = new File(f, "another_file.bin"); - File luigiFile = new File(f, "_luigi"); - File success = new File(f, "_SUCCESS"); - - File[] files = { child1, child2, luigiFile, success }; - - for (File child : files) { - child.deleteOnExit(); - - BufferedWriter out = new BufferedWriter(new FileWriter(child)); - try { - out.write(contents); - } finally { - out.close(); - } - } - + + File child1 = temporaryFolder.newFile("dataFile1.txt"); + File child2 = temporaryFolder.newFile("another_file.bin"); + File luigiFile = temporaryFolder.newFile("_luigi"); + File success = temporaryFolder.newFile("_SUCCESS"); + + createTempFiles(contents.getBytes(), child1, child2, luigiFile, success); + // test that only the valid files are accepted final DummyFileInputFormat format = new DummyFileInputFormat(); - format.setFilePath(f.toURI().toString()); + format.setFilePath(temporaryFolder.getRoot().toURI().toString()); format.configure(new Configuration()); FileInputSplit[] splits = format.createInputSplits(1); @@ -314,43 +299,95 @@ public class FileInputFormatTest { } @Test + public void testExcludeFiles() { + try { + final String contents = "CONTENTS"; + + // create some accepted, some ignored files + + File child1 = temporaryFolder.newFile("dataFile1.txt"); + File child2 = temporaryFolder.newFile("another_file.bin"); + + File[] files = { child1, child2 }; + + createTempFiles(contents.getBytes(), files); + + // test that only the valid files are accepted + + Configuration configuration = new Configuration(); + + final DummyFileInputFormat format = new DummyFileInputFormat(); + format.setFilePath(temporaryFolder.getRoot().toURI().toString()); + format.configure(configuration); + format.setFilesFilter(new GlobFilePathFilter( + Collections.singletonList("**"), + Collections.singletonList("**/another_file.bin"))); + FileInputSplit[] splits = format.createInputSplits(1); + + Assert.assertEquals(1, splits.length); + + final URI uri1 = splits[0].getPath().toUri(); + + final URI childUri1 = child1.toURI(); + + Assert.assertEquals(uri1, childUri1); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test + public void testReadMultiplePatterns() { + try { + final String contents = "CONTENTS"; + + // create some accepted, some ignored files + + File child1 = temporaryFolder.newFile("dataFile1.txt"); + File child2 = temporaryFolder.newFile("another_file.bin"); + createTempFiles(contents.getBytes(), child1, child2); + + // test that only the valid files are accepted + + Configuration configuration = new Configuration(); + + final DummyFileInputFormat format = new DummyFileInputFormat(); + format.setFilePath(temporaryFolder.getRoot().toURI().toString()); + format.configure(configuration); + format.setFilesFilter(new GlobFilePathFilter( + Collections.singletonList("**"), + Lists.newArrayList("**/another_file.bin", "**/dataFile1.txt") + )); + FileInputSplit[] splits = format.createInputSplits(1); + + Assert.assertEquals(0, splits.length); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + } + + @Test public void testGetStatsIgnoredUnderscoreFiles() { try { - final long SIZE = 2048; + final int SIZE = 2048; final long TOTAL = 2*SIZE; // create two accepted and two ignored files - File tempDir = new File(System.getProperty("java.io.tmpdir")); - File f; - do { - f = new File(tempDir, TestFileUtils.randomFileName("")); - } - while (f.exists()); - - assertTrue(f.mkdirs()); - f.deleteOnExit(); - - File child1 = new File(f, "dataFile1.txt"); - File child2 = new File(f, "another_file.bin"); - File luigiFile = new File(f, "_luigi"); - File success = new File(f, "_SUCCESS"); - - File[] files = { child1, child2, luigiFile, success }; + File child1 = temporaryFolder.newFile("dataFile1.txt"); + File child2 = temporaryFolder.newFile("another_file.bin"); + File luigiFile = temporaryFolder.newFile("_luigi"); + File success = temporaryFolder.newFile("_SUCCESS"); - for (File child : files) { - child.deleteOnExit(); + createTempFiles(new byte[SIZE], child1, child2, luigiFile, success); - BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child)); - try { - for (long bytes = SIZE; bytes > 0; bytes--) { - out.write(0); - } - } finally { - out.close(); - } - } final DummyFileInputFormat format = new DummyFileInputFormat(); - format.setFilePath(f.toURI().toString()); + format.setFilePath(temporaryFolder.getRoot().toURI().toString()); format.configure(new Configuration()); // check that only valid files are used for statistics computation @@ -406,7 +443,20 @@ public class FileInputFormatTest { } // ------------------------------------------------------------------------ - + + private void createTempFiles(byte[] contents, File... files) throws IOException { + for (File child : files) { + child.deleteOnExit(); + + BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(child)); + try { + out.write(contents); + } finally { + out.close(); + } + } + } + private class DummyFileInputFormat extends FileInputFormat<IntValue> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java new file mode 100644 index 0000000..bced076 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import org.apache.flink.core.fs.Path; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class GlobFilePathFilterTest { + @Test + public void defaultConstructorCreateMatchAllFilter() { + GlobFilePathFilter matcher = new GlobFilePathFilter(); + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void matchAllFilesByDefault() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.<String>emptyList(), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void excludeFilesNotInIncludePatterns() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/*"), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/file.txt"))); + assertTrue(matcher.filterPath(new Path("dir1/file.txt"))); + } + + @Test + public void excludeFilesIfMatchesExclude() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/*"), + Collections.singletonList("dir/file.txt")); + + assertTrue(matcher.filterPath(new Path("dir/file.txt"))); + } + + @Test + public void includeFileWithAnyCharacterMatcher() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/?.txt"), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/a.txt"))); + assertTrue(matcher.filterPath(new Path("dir/aa.txt"))); + } + + @Test + public void includeFileWithCharacterSetMatcher() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/[acd].txt"), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/a.txt"))); + assertFalse(matcher.filterPath(new Path("dir/c.txt"))); + assertFalse(matcher.filterPath(new Path("dir/d.txt"))); + assertTrue(matcher.filterPath(new Path("dir/z.txt"))); + } + + @Test + public void includeFileWithCharacterRangeMatcher() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("dir/[a-d].txt"), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("dir/a.txt"))); + assertFalse(matcher.filterPath(new Path("dir/b.txt"))); + assertFalse(matcher.filterPath(new Path("dir/c.txt"))); + assertFalse(matcher.filterPath(new Path("dir/d.txt"))); + assertTrue(matcher.filterPath(new Path("dir/z.txt"))); + } + + @Test + public void excludeHDFSFile() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("**"), + Collections.singletonList("/dir/file2.txt")); + + assertFalse(matcher.filterPath(new Path("hdfs:///dir/file1.txt"))); + assertTrue(matcher.filterPath(new Path("hdfs:///dir/file2.txt"))); + assertFalse(matcher.filterPath(new Path("hdfs:///dir/file3.txt"))); + } + + @Test + public void excludeFilenameWithStart() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("**"), + Collections.singletonList("\\*")); + + assertTrue(matcher.filterPath(new Path("*"))); + assertFalse(matcher.filterPath(new Path("**"))); + assertFalse(matcher.filterPath(new Path("other.txt"))); + } + + @Test + public void singleStarPattern() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("*"), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("a"))); + assertTrue(matcher.filterPath(new Path("a/b"))); + assertTrue(matcher.filterPath(new Path("a/b/c"))); + } + + @Test + public void doubleStarPattern() { + GlobFilePathFilter matcher = new GlobFilePathFilter( + Collections.singletonList("**"), + Collections.<String>emptyList()); + + assertFalse(matcher.filterPath(new Path("a"))); + assertFalse(matcher.filterPath(new Path("a/b"))); + assertFalse(matcher.filterPath(new Path("a/b/c"))); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java index e6cd5d9..663345c 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java @@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.FilePathFilter; +import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; @@ -122,9 +122,9 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); ContinuousFileMonitoringFunction<String> monitoringFunction = new ContinuousFileMonitoringFunction<>(format, hdfsURI, - FilePathFilter.createDefaultFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, env.getParallelism(), INTERVAL); http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index def9378..4aadaec 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.functions.source.FilePathFilter; +import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; @@ -216,8 +216,9 @@ public class ContinuousFileMonitoringTest { } TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + format.setFilesFilter(new PathFilter()); ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, new PathFilter(), + new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); monitoringFunction.open(new Configuration()); @@ -242,8 +243,9 @@ public class ContinuousFileMonitoringTest { fc.start(); TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(), + new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL); monitoringFunction.open(new Configuration()); @@ -291,8 +293,9 @@ public class ContinuousFileMonitoringTest { Assert.assertTrue(fc.getFilesCreated().size() >= 1); TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); ContinuousFileMonitoringFunction<String> monitoringFunction = - new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(), + new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); monitoringFunction.open(new Configuration()); @@ -427,7 +430,7 @@ public class ContinuousFileMonitoringTest { assert (hdfs != null); org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx); - Assert.assertTrue (!hdfs.exists(file)); + Assert.assertFalse(hdfs.exists(file)); org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx); FSDataOutputStream stream = hdfs.create(tmp); http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 1913a36..ead9564 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -52,7 +52,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction; -import org.apache.flink.streaming.api.functions.source.FilePathFilter; +import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.streaming.api.functions.source.FileReadFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; @@ -917,11 +917,11 @@ public abstract class StreamExecutionEnvironment { Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); TextInputFormat format = new TextInputFormat(new Path(filePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); - return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, - FilePathFilter.createDefaultFilter(), typeInfo); + return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); } /** @@ -952,7 +952,52 @@ public abstract class StreamExecutionEnvironment { */ public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) { - return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter()); + return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1); + } + + /** + * + * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending + * on the provided {@link FileProcessingMode}. + * <p> + * See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)} + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans + * @param filter + * The files to be excluded from the processing + * @param <OUT> + * The type of the returned data stream + * @return The data stream that represents the data read from the given file + * + * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and + * {@link StreamExecutionEnvironment#readFile(FileInputFormat, String, FileProcessingMode, long)} + * + */ + @PublicEvolving + @Deprecated + public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, + String filePath, + FileProcessingMode watchType, + long interval, + FilePathFilter filter) { + inputFormat.setFilesFilter(filter); + + TypeInformation<OUT> typeInformation; + try { + typeInformation = TypeExtractor.getInputFormatTypes(inputFormat); + } catch (Exception e) { + throw new InvalidProgramException("The type returned by the input format could not be " + + "automatically determined. Please specify the TypeInformation of the produced type " + + "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); + } + return readFile(inputFormat, filePath, watchType, interval, typeInformation); } /** @@ -986,8 +1031,6 @@ public abstract class StreamExecutionEnvironment { * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit * @param interval * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans - * @param filter - * The files to be excluded from the processing * @param <OUT> * The type of the returned data stream * @return The data stream that represents the data read from the given file @@ -996,8 +1039,7 @@ public abstract class StreamExecutionEnvironment { public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, - long interval, - FilePathFilter filter) { + long interval) { TypeInformation<OUT> typeInformation; try { @@ -1007,7 +1049,7 @@ public abstract class StreamExecutionEnvironment { "automatically determined. Please specify the TypeInformation of the produced type " + "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); } - return readFile(inputFormat, filePath, watchType, interval, filter, typeInformation); + return readFile(inputFormat, filePath, watchType, interval, typeInformation); } /** @@ -1057,8 +1099,6 @@ public abstract class StreamExecutionEnvironment { * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") * @param watchType * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit - * @param filter - * The files to be excluded from the processing * @param typeInformation * Information on the type of the elements in the output stream * @param interval @@ -1072,7 +1112,6 @@ public abstract class StreamExecutionEnvironment { String filePath, FileProcessingMode watchType, long interval, - FilePathFilter filter, TypeInformation<OUT> typeInformation) { Preconditions.checkNotNull(inputFormat, "InputFormat must not be null."); @@ -1080,7 +1119,7 @@ public abstract class StreamExecutionEnvironment { Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); inputFormat.setFilePath(filePath); - return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter, interval); + return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval); } /** @@ -1250,8 +1289,7 @@ public abstract class StreamExecutionEnvironment { if (inputFormat instanceof FileInputFormat) { FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; source = createFileInput(format, typeInfo, "Custom File source", - FileProcessingMode.PROCESS_ONCE, - FilePathFilter.createDefaultFilter(), -1); + FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, "Custom Source"); } @@ -1270,14 +1308,12 @@ public abstract class StreamExecutionEnvironment { TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, - FilePathFilter pathFilter, long interval) { Preconditions.checkNotNull(inputFormat, "Unspecified file input format."); Preconditions.checkNotNull(typeInfo, "Unspecified output type information."); Preconditions.checkNotNull(sourceName, "Unspecified name for the source."); Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode."); - Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function."); Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) || interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, @@ -1286,7 +1322,7 @@ public abstract class StreamExecutionEnvironment { ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>( inputFormat, inputFormat.getFilePath().toString(), - pathFilter, monitoringMode, getParallelism(), interval); + monitoringMode, getParallelism(), interval); ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat); http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 8ff4a2a..d36daab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; @@ -81,15 +82,13 @@ public class ContinuousFileMonitoringFunction<OUT> private Long globalModificationTime; - private FilePathFilter pathFilter; - private transient Object checkpointLock; private volatile boolean isRunning = true; public ContinuousFileMonitoringFunction( FileInputFormat<OUT> format, String path, - FilePathFilter filter, FileProcessingMode watchType, + FileProcessingMode watchType, int readerParallelism, long interval) { if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) { @@ -98,7 +97,6 @@ public class ContinuousFileMonitoringFunction<OUT> } this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format."); this.path = Preconditions.checkNotNull(path, "Unspecified Path."); - this.pathFilter = Preconditions.checkNotNull(filter, "Unspecified File Path Filter."); this.interval = interval; this.watchType = watchType; @@ -274,7 +272,7 @@ public class ContinuousFileMonitoringFunction<OUT> */ private boolean shouldIgnore(Path filePath, long modificationTime) { assert (Thread.holdsLock(checkpointLock)); - boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime); + boolean shouldIgnore = modificationTime <= globalModificationTime; if (shouldIgnore) { LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime); } http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java deleted file mode 100644 index 1a359ab..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.api.functions.source; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.fs.Path; - -import java.io.Serializable; - -/** - * An interface to be implemented by the user when using the {@link ContinuousFileMonitoringFunction}. - * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further - * processing or not. This can serve to exclude temporary or partial files that - * are still being written. - */ -@PublicEvolving -public abstract class FilePathFilter implements Serializable { - - public static FilePathFilter createDefaultFilter() { - return new DefaultFilter(); - } - /** - * Returns {@code true} if the {@code filePath} given is to be - * ignored when processing a directory, e.g. - * <pre> - * {@code - * - * public boolean filterPaths(Path filePath) { - * return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_"); - * } - * }</pre> - */ - public abstract boolean filterPath(Path filePath); - - /** - * The default file path filtering method and is used - * if no other such function is provided. This filter leaves out - * files starting with ".", "_", and "_COPYING_". - */ - public static class DefaultFilter extends FilePathFilter { - - DefaultFilter() {} - - @Override - public boolean filterPath(Path filePath) { - return filePath == null || - filePath.getName().startsWith(".") || - filePath.getName().startsWith("_") || - filePath.getName().contains("_COPYING_"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index f6dab1e..9cb36a5 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala import com.esotericsoftware.kryo.Serializer import org.apache.flink.annotation.{Internal, Public, PublicEvolving} -import org.apache.flink.api.common.io.{FileInputFormat, InputFormat} +import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer @@ -467,6 +467,40 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** * Reads the contents of the user-specified path based on the given [[FileInputFormat]]. + * Depending on the provided [[FileProcessingMode]]. + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or + * "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react + * to new data, or process once and exit + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) + * between consecutive path scans + * @param filter + * The files to be excluded from the processing + * @return The data stream that represents the data read from the given file + * + * @deprecated Use {@link FileInputFormat#setFilesFilter(FilePathFilter)} to set a filter and + * {@link StreamExecutionEnvironment#readFile(FileInputFormat, + * String, FileProcessingMode, long)} + */ + @PublicEvolving + @Deprecated + def readFile[T: TypeInformation]( + inputFormat: FileInputFormat[T], + filePath: String, + watchType: FileProcessingMode, + interval: Long, + filter: FilePathFilter): DataStream[T] = { + asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter)) + } + + /** + * Reads the contents of the user-specified path based on the given [[FileInputFormat]]. * Depending on the provided [[FileProcessingMode]], the source * may periodically monitor (every `interval` ms) the path for new data * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process @@ -496,8 +530,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * @param interval * In the case of periodic path monitoring, this specifies the interval (in millis) * between consecutive path scans - * @param filter - * The files to be excluded from the processing * @return The data stream that represents the data read from the given file */ @PublicEvolving @@ -505,10 +537,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { inputFormat: FileInputFormat[T], filePath: String, watchType: FileProcessingMode, - interval: Long, - filter: FilePathFilter): DataStream[T] = { + interval: Long): DataStream[T] = { val typeInfo = implicitly[TypeInformation[T]] - asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter, typeInfo)) + asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo)) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/48109104/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index d540a92..a265c0a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; -import org.apache.flink.streaming.api.functions.source.FilePathFilter; +import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; @@ -112,8 +112,9 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran // create the monitoring source along with the necessary readers. TestingSinkFunction sink = new TestingSinkFunction(); TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); DataStream<String> inputStream = env.readFile(format, localFsURI, - FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, FilePathFilter.createDefaultFilter()); + FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL); inputStream.flatMap(new FlatMapFunction<String, String>() { @Override