luoyuxia commented on code in PR #22249:
URL: https://github.com/apache/flink/pull/22249#discussion_r1230407539


##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
                                     + "The statistics reporting is a heavy 
operation in some cases,"
                                     + "this config allows users to choose the 
statistics type according to different situations.");
 
+    public static final ConfigOption<String> SOURCE_REGEX_PATTERN =
+            key("source.regex-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The regex pattern to filter files or directories. 
"
+                                    + "This regex pattern should be a relative 
path for the `path` option."

Review Comment:
   Please remember to update the desc since IIUC it'll be absolute path.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one 
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the 
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator 
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveEnumerator {

Review Comment:
   nit:
   mark it as internal.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such 
as HDFS. File systems
+ * that do not expose block information will not create multiple file splits 
per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for 
example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of 
known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator 
will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveAllDirEnumerator {

Review Comment:
   nit: also mark it as internal



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one 
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the 
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator 
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;

Review Comment:
   make the `fileFilter` to be protected in `NonSplittingRecursiveEnumerator` 
so that we won't need it again in here.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one 
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the 
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator 
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    /** The filter used to skip recursion in the hidden directories. */

Review Comment:
   ```suggestion
       /** The filter used to skip hidden directories. */
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one 
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the 
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator 
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    /** The filter used to skip recursion in the hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a NonSplittingRegexEnumerator that enumerates all files whose 
path or parent path

Review Comment:
   Why parent path?
   ```suggestion
        * Creates a NonSplittingRegexEnumerator that enumerates all files whose 
path
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableFactoryTest.java:
##########
@@ -49,6 +49,21 @@ public class FileSystemTableFactoryTest {
                     Column.physical("f1", DataTypes.BIGINT()),
                     Column.physical("f2", DataTypes.BIGINT()));
 
+    @Test
+    public void testSourceRegexPattern() {

Review Comment:
   Do we really need it since we have IT for it?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceStreamingITCase.java:
##########
@@ -88,4 +88,61 @@ public void testMonitorContinuously() throws Exception {
 
         assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
     }
+
+    @Test
+    public void testRegexPath() throws Exception {

Review Comment:
   nit
   ```suggestion
       public void testSourceWithRegexPattern() throws Exception {
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/file/table/FileSystemTableSourceStreamingITCase.java:
##########
@@ -88,4 +88,61 @@ public void testMonitorContinuously() throws Exception {
 
         assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
     }
+
+    @Test
+    public void testRegexPath() throws Exception {

Review Comment:
   I think the test can't verify the `source.regex-pattern` take effect or not 
since it won't filer out any files. 



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/NonSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories. Each file matched the given regex pattern becomes one 
split; this enumerator
+ * does not split files into smaller "block" units.
+ *
+ * <p>The default instantiation of this enumerator filters files with the 
common hidden file
+ * prefixes '.' and '_'. A custom file filter can be specified.
+ *
+ * <p>Compared to {@link NonSplittingRecursiveEnumerator}, this enumerator 
will enumerate all files
+ * even through its parent directory is filtered out by the file filter.
+ */
+public class NonSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveEnumerator {
+
+    /** The custom filter predicate to filter out unwanted files. */
+    private final Predicate<Path> fileFilter;
+
+    /** The filter used to skip recursion in the hidden directories. */
+    private final DefaultFileFilter hiddenDirFilter = new DefaultFileFilter();
+
+    /**
+     * Creates a NonSplittingRegexEnumerator that enumerates all files whose 
path or parent path
+     * matches the regex except hidden files. Hidden files are considered 
files where the filename
+     * starts with '.' or with '_'.
+     */
+    public NonSplittingRecursiveAllDirEnumerator(String pathPattern) {

Review Comment:
   nit:
   ```suggestion
       public NonSplittingRecursiveAllDirEnumerator(String pathRegexPattern) {
   ```



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/BlockSplittingRecursiveAllDirEnumerator.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.compression.StandardDeCompressors;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * This {@code FileEnumerator} enumerates all files under the given paths 
recursively except the
+ * hidden directories, and creates a separate split for each file block.
+ *
+ * <p>Please note that file blocks are only exposed by some file systems, such 
as HDFS. File systems
+ * that do not expose block information will not create multiple file splits 
per file, but keep the
+ * files as one source split.
+ *
+ * <p>Files with suffixes corresponding to known compression formats (for 
example '.gzip', '.bz2',
+ * ...) will not be split. See {@link StandardDeCompressors} for a list of 
known formats and
+ * suffixes.
+ *
+ * <p>Compared to {@link BlockSplittingRecursiveEnumerator}, this enumerator 
will enumerate all
+ * files even through its parent directory is filtered out by the file filter.
+ */
+public class BlockSplittingRecursiveAllDirEnumerator extends 
NonSplittingRecursiveAllDirEnumerator {

Review Comment:
   Why not make it extend `BlockSplittingRecursiveEnumerator`? If we extend 
`BlockSplittingRecursiveEnumerator`, the class will be simipler with less code 
and complex logic.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java:
##########
@@ -269,6 +271,19 @@ private SourceProvider 
createSourceProvider(BulkFormat<RowData, FileSourceSplit>
         tableOptions
                 
.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
                 .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                .getOptional(FileSystemConnectorOptions.SOURCE_REGEX_PATTERN)
+                .ifPresent(
+                        regex -> {

Review Comment:
   nit:
   `{` can be removed.



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemConnectorOptions.java:
##########
@@ -68,6 +68,17 @@ public class FileSystemConnectorOptions {
                                     + "The statistics reporting is a heavy 
operation in some cases,"
                                     + "this config allows users to choose the 
statistics type according to different situations.");
 
+    public static final ConfigOption<String> SOURCE_REGEX_PATTERN =
+            key("source.regex-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The regex pattern to filter files or directories. 
"
+                                    + "This regex pattern should be a relative 
path for the `path` option."

Review Comment:
   use `source.path.regex-pattern`?



##########
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/enumerate/RegexFileFilter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.enumerate;
+
+import org.apache.flink.core.fs.Path;
+
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+/**
+ * A file filter that filters out files based on the given path pattern and 
hidden files, see {@link

Review Comment:
   nit:
   ```suggestion
    * A file filter that filters out hidden files, see {@link 
DefaultFileFilter} and the files whose path doesn't match the given regex 
pattern.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to