luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1230407539
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
+ "The statistics reporting is a heavy
operation in some cases,"
+ "this config allows users to choose the
statistics type according to different situations.");
+ public static final ConfigOption<String> SOURCE_REGEX_PATTERN =
+ key("source.regex-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The regex pattern to filter files or directories.
"
+ + "This regex pattern should be a relative
path for the `path` option."
Review Comment:
Please remember to update the desc since IIUC it'll be absolute path.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveEnumerator {
Review Comment:
nit:
mark it as internal.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such
as HDFS. File systems
+ * that do not expose block information will not create multiple file splits
per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for
example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of
known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator
will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveAllDirEnumerator {
Review Comment:
nit: also mark it as internal
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveEnumerator {
+
+ /** The custom filter predicate to filter out unwanted files. */
+ private final Predicate<Path> fileFilter;
Review Comment:
make the `fileFilter` to be protected in `NonSplittingRecursiveEnumerator`
so that we won't need it again in here.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveEnumerator {
+
+ /** The custom filter predicate to filter out unwanted files. */
+ private final Predicate<Path> fileFilter;
+
+ /** The filter used to skip recursion in the hidden directories. */
Review Comment:
```suggestion
/** The filter used to skip hidden directories. */
```
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveEnumerator {
+
+ /** The custom filter predicate to filter out unwanted files. */
+ private final Predicate<Path> fileFilter;
+
+ /** The filter used to skip recursion in the hidden directories. */
+ private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+ /**
+ * Creates a NonSplittingRegexEnumerator that enumerates all files whose
path or parent path
Review Comment:
Why parent path?
```suggestion
* Creates a NonSplittingRegexEnumerator that enumerates all files whose
path
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableFactoryTest.java:
##########
@@ -49,6 +49,21 @@ public class FileSystemTableFactoryTest {
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()));
+ @Test
+ public void testSourceRegexPattern() {
Review Comment:
Do we really need it since we have IT for it?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceStreamingITCase.java:
##########
@@ -88,4 +88,61 @@ public void testMonitorContinuously() throws Exception {
assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}
+
+ @Test
+ public void testRegexPath() throws Exception {
Review Comment:
nit
```suggestion
public void testSourceWithRegexPattern() throws Exception {
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceStreamingITCase.java:
##########
@@ -88,4 +88,61 @@ public void testMonitorContinuously() throws Exception {
assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}
+
+ @Test
+ public void testRegexPath() throws Exception {
Review Comment:
I think the test can't verify the `source.regex-pattern` take effect or not
since it won't filer out any files.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveEnumerator {
+
+ /** The custom filter predicate to filter out unwanted files. */
+ private final Predicate<Path> fileFilter;
+
+ /** The filter used to skip recursion in the hidden directories. */
+ private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+ /**
+ * Creates a NonSplittingRegexEnumerator that enumerates all files whose
path or parent path
+ * matches the regex except hidden files. Hidden files are considered
files where the filename
+ * starts with '.' or with '_'.
+ */
+ public NonSplittingRecursiveAllDirEnumerator(String pathPattern) {
Review Comment:
nit:
```suggestion
public NonSplittingRecursiveAllDirEnumerator(String pathRegexPattern) {
```
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths
recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such
as HDFS. File systems
+ * that do not expose block information will not create multiple file splits
per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for
example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of
known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator
will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends
NonSplittingRecursiveAllDirEnumerator {
Review Comment:
Why not make it extend `BlockSplittingRecursiveEnumerator`? If we extend
`BlockSplittingRecursiveEnumerator`, the class will be simipler with less code
and complex logic.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,6 +271,19 @@ private SourceProvider
createSourceProvider(BulkFormat<RowData, FileSourceSplit>
tableOptions
.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
.ifPresent(fileSourceBuilder::monitorContinuously);
+ tableOptions
+ .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+ .ifPresent(
+ regex -> {
Review Comment:
nit:
`{` can be removed.
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
+ "The statistics reporting is a heavy
operation in some cases,"
+ "this config allows users to choose the
statistics type according to different situations.");
+ public static final ConfigOption<String> SOURCE_REGEX_PATTERN =
+ key("source.regex-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The regex pattern to filter files or directories.
"
+ + "This regex pattern should be a relative
path for the `path` option."
Review Comment:
use `source.path.regex-pattern`?
##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/RegexFileFilter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.core.fs.Path;
+
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * A file filter that filters out files based on the given path pattern and
hidden files, see {@link
Review Comment:
nit:
```suggestion
* A file filter that filters out hidden files, see {@link
DefaultFileFilter} and the files whose path doesn't match the given regex
pattern.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]