Hisoka-X commented on code in PR #8507:
URL: https://github.com/apache/seatunnel/pull/8507#discussion_r1915904441


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
 public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
+    private int batchReadRows = 1024;

Review Comment:
   ```suggestion
       private static final int BATCH_READ_ROWS = 1024;
   ```



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
 public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
+    private int batchReadRows = 1024;
+
+    /** user can specified row count per split */
+    private long rowCountPerSplitByUser = 0;
+
+    private final long DEFAULT_ROW_COUNT = 100000;
+    private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
+    private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;
+    private boolean whetherSplitFile = 
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+    @Override
+    public void setPluginConfig(Config pluginConfig) {
+        super.setPluginConfig(pluginConfig);
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+            whetherSplitFile =
+                    
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) {
+            rowCountPerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) {
+            fileSizePerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key());
+        }
+    }
+
+    /**
+     * split a file into many splits: <br>
+     * good: <br>
+     * 1. lower memory occupy. split read end, the memory can recycle. <br>
+     * 2. lower checkpoint ack delay <br>
+     * 3. support fine-grained concurrency. <br>
+     * bad: <br>
+     * 1. cannot guarantee the order of the data. <br>
+     *
+     * @param path file path
+     * @return FileSourceSplit set
+     */
+    @Override
+    public Set<FileSourceSplit> getFileSourceSplits(String path) {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            String errorMsg =
+                    String.format(
+                            "This file [%s] is not a orc file, please check 
the format of this file",
+                            path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
+        }
+        Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+        if (!whetherSplitFile) {
+            fileSourceSplits.add(new FileSourceSplit(path));
+            return fileSourceSplits;
+        }
+        try (Reader reader =
+                hadoopFileSystemProxy.doWithHadoopAuth(
+                        ((configuration, userGroupInformation) -> {
+                            OrcFile.ReaderOptions readerOptions =
+                                    OrcFile.readerOptions(configuration);
+                            return OrcFile.createReader(new Path(path), 
readerOptions);
+                        }))) {
+            log.info(
+                    "path:{}, rowCountPerSplitByUser:{}, 
fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}",
+                    path,
+                    rowCountPerSplitByUser,
+                    fileSizePerSplitByUser,
+                    reader.getContentLength(),
+                    reader.getStripes().size(),
+                    reader.getNumberOfRows());
+            long rowCountPerSplit = rowCountPerSplitByUser;
+            if (rowCountPerSplit <= 0) {
+                // auto split by file size
+                long fileSize = reader.getContentLength();
+                long rowCount = reader.getNumberOfRows();
+                long splitCount =
+                        fileSize
+                                / (fileSizePerSplitByUser <= 0
+                                        ? DEFAULT_FILE_SIZE_PER_SPLIT
+                                        : fileSizePerSplitByUser);
+                if (rowCount == 0 || splitCount == 0) {
+                    log.warn("cannot get file size or row count for orc 
file:{}", path);
+                    rowCountPerSplit = DEFAULT_ROW_COUNT;
+                } else {
+                    rowCountPerSplit = rowCount / splitCount;
+                }
+            }
+            long low = 0;
+            long high = low;
+            int splitCountAll = 0;
+            if (reader.getStripes() == null) {
+                log.warn("cannot get stripes for orc file:{}", path);
+                fileSourceSplits.add(new FileSourceSplit(path));
+                return fileSourceSplits;
+            }
+            for (int i = 0; i < reader.getStripes().size(); i++) {
+                StripeInformation stripe = reader.getStripes().get(i);
+                long leftOverCount = stripe.getNumberOfRows();
+                int splitCount4Strip = 0;
+                long startRow = low;
+                while (leftOverCount > 0) {
+                    if (leftOverCount > rowCountPerSplit) {
+                        high = low + rowCountPerSplit;
+                    } else {
+                        high = low + leftOverCount;
+                    }
+                    FileSourceSplit split = new FileSourceSplit(path, low, 
high - 1);
+                    fileSourceSplits.add(split);
+                    leftOverCount = leftOverCount - rowCountPerSplit;
+                    low = high;
+                    splitCountAll++;
+                    splitCount4Strip++;
+                }
+                log.debug(
+                        "generate split count:{} for this strip:{}, 
startRow:{}, endRow:{},",
+                        splitCount4Strip,
+                        i,
+                        startRow,
+                        high - 1);
+            }
+            log.info("generate split count:{} for this file:{}", 
splitCountAll, path);
+            return fileSourceSplits;
+        } catch (IOException e) {
+            String errorMsg = String.format("Create orc reader for this file 
[%s] failed", path);
+            throw new FileConnectorException(
+                    CommonErrorCodeDeprecated.READER_OPERATION_FAILED, 
errorMsg);
+        }
+    }
+
+    @Override
+    public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+            throws IOException, FileConnectorException {
+        String path = split.getFilePath();
+        String tableId = split.getTableId();
+        if (split.getMinRowIndex() == null || split.getMaxRowIndex() == null) {
+            log.warn(

Review Comment:
   ```suggestion
               log.debug(
   ```



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java:
##########
@@ -82,6 +89,127 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
 
     private int[] indexes;
 
+    private boolean whetherSplitFile = 
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+    @Override
+    public void setPluginConfig(Config pluginConfig) {
+        super.setPluginConfig(pluginConfig);
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+            whetherSplitFile =
+                    
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+        }
+    }
+
+    /**
+     * parquet file can only split by file block now. <br>
+     * user cannot customise the split count and size. <br>
+     *
+     * @param path file path
+     * @return splits
+     */
+    @Override
+    public Set<FileSourceSplit> getFileSourceSplits(String path) {
+        Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+        if (!whetherSplitFile) {
+            fileSourceSplits.add(new FileSourceSplit(path));
+            return fileSourceSplits;
+        }
+        ParquetMetadata metadata;
+        try (ParquetFileReader reader =
+                hadoopFileSystemProxy.doWithHadoopAuth(
+                        ((configuration, userGroupInformation) -> {
+                            HadoopInputFile hadoopInputFile =
+                                    HadoopInputFile.fromPath(new Path(path), 
configuration);
+                            return ParquetFileReader.open(hadoopInputFile);
+                        }))) {
+            metadata = reader.getFooter();
+        } catch (IOException e) {
+            String errorMsg =
+                    String.format("Create parquet reader for this file [%s] 
failed", path);
+            throw new FileConnectorException(
+                    CommonErrorCodeDeprecated.READER_OPERATION_FAILED, 
errorMsg, e);
+        }
+        if (metadata == null || CollectionUtils.isEmpty(metadata.getBlocks())) 
{
+            log.warn("cannot get meta or blocks for path:{}", path);
+            fileSourceSplits.add(new FileSourceSplit(path));
+            return fileSourceSplits;
+        }
+
+        long low = 0;
+        long high = low;
+        long splitCountAll = 0;
+        for (int i = 0; i < metadata.getBlocks().size(); i++) {
+            high = low + metadata.getBlocks().get(i).getCompressedSize();
+            FileSourceSplit split = new FileSourceSplit(path, low, high);
+            fileSourceSplits.add(split);
+            low = high;
+        }
+        log.info("generate parquet split count:{} for this file:{}", 
splitCountAll, path);
+        return fileSourceSplits;
+    }
+
+    /**
+     * todo: <br>
+     * In theory, batch column reading can improve reading performance.
+     */
+    @Override
+    public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)

Review Comment:
   ditto



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java:
##########
@@ -178,4 +178,23 @@ public class BaseSourceConfigOptions {
                     .enumType(ArchiveCompressFormat.class)
                     .defaultValue(ArchiveCompressFormat.NONE)
                     .withDescription("Archive compression codec");
+
+    public static final Option<Long> FILE_SIZE_PER_SPLIT =
+            Options.key("file_size_per_split")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "split a file into many splits according to file 
size, if row_count_per_split not config. use row_count_per_split prefer.");
+
+    public static final Option<Long> ROW_COUNT_PER_SPLIT =
+            Options.key("row_count_per_split")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("split a file into many splits according 
to row count");
+
+    public static final Option<Boolean> WHETHER_SPLIT_FILE =
+            Options.key("whether_split_file")

Review Comment:
   ```suggestion
               Options.key("split_single_file_to_multiple_splits")
   ```



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
 public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
+    private int batchReadRows = 1024;
+
+    /** user can specified row count per split */
+    private long rowCountPerSplitByUser = 0;
+
+    private final long DEFAULT_ROW_COUNT = 100000;
+    private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
+    private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;
+    private boolean whetherSplitFile = 
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+    @Override
+    public void setPluginConfig(Config pluginConfig) {
+        super.setPluginConfig(pluginConfig);
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+            whetherSplitFile =
+                    
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) {
+            rowCountPerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) {
+            fileSizePerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key());
+        }
+    }
+
+    /**
+     * split a file into many splits: <br>
+     * good: <br>
+     * 1. lower memory occupy. split read end, the memory can recycle. <br>
+     * 2. lower checkpoint ack delay <br>
+     * 3. support fine-grained concurrency. <br>
+     * bad: <br>
+     * 1. cannot guarantee the order of the data. <br>
+     *
+     * @param path file path
+     * @return FileSourceSplit set
+     */
+    @Override
+    public Set<FileSourceSplit> getFileSourceSplits(String path) {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            String errorMsg =
+                    String.format(
+                            "This file [%s] is not a orc file, please check 
the format of this file",
+                            path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
+        }
+        Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+        if (!whetherSplitFile) {
+            fileSourceSplits.add(new FileSourceSplit(path));
+            return fileSourceSplits;
+        }
+        try (Reader reader =
+                hadoopFileSystemProxy.doWithHadoopAuth(
+                        ((configuration, userGroupInformation) -> {
+                            OrcFile.ReaderOptions readerOptions =
+                                    OrcFile.readerOptions(configuration);
+                            return OrcFile.createReader(new Path(path), 
readerOptions);
+                        }))) {
+            log.info(
+                    "path:{}, rowCountPerSplitByUser:{}, 
fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}",
+                    path,
+                    rowCountPerSplitByUser,
+                    fileSizePerSplitByUser,
+                    reader.getContentLength(),
+                    reader.getStripes().size(),
+                    reader.getNumberOfRows());
+            long rowCountPerSplit = rowCountPerSplitByUser;
+            if (rowCountPerSplit <= 0) {
+                // auto split by file size
+                long fileSize = reader.getContentLength();
+                long rowCount = reader.getNumberOfRows();
+                long splitCount =
+                        fileSize
+                                / (fileSizePerSplitByUser <= 0
+                                        ? DEFAULT_FILE_SIZE_PER_SPLIT
+                                        : fileSizePerSplitByUser);
+                if (rowCount == 0 || splitCount == 0) {
+                    log.warn("cannot get file size or row count for orc 
file:{}", path);
+                    rowCountPerSplit = DEFAULT_ROW_COUNT;
+                } else {
+                    rowCountPerSplit = rowCount / splitCount;
+                }
+            }
+            long low = 0;
+            long high = low;
+            int splitCountAll = 0;
+            if (reader.getStripes() == null) {
+                log.warn("cannot get stripes for orc file:{}", path);
+                fileSourceSplits.add(new FileSourceSplit(path));
+                return fileSourceSplits;
+            }
+            for (int i = 0; i < reader.getStripes().size(); i++) {
+                StripeInformation stripe = reader.getStripes().get(i);
+                long leftOverCount = stripe.getNumberOfRows();
+                int splitCount4Strip = 0;
+                long startRow = low;
+                while (leftOverCount > 0) {
+                    if (leftOverCount > rowCountPerSplit) {
+                        high = low + rowCountPerSplit;
+                    } else {
+                        high = low + leftOverCount;
+                    }
+                    FileSourceSplit split = new FileSourceSplit(path, low, 
high - 1);
+                    fileSourceSplits.add(split);
+                    leftOverCount = leftOverCount - rowCountPerSplit;
+                    low = high;
+                    splitCountAll++;
+                    splitCount4Strip++;
+                }
+                log.debug(
+                        "generate split count:{} for this strip:{}, 
startRow:{}, endRow:{},",
+                        splitCount4Strip,
+                        i,
+                        startRow,
+                        high - 1);
+            }
+            log.info("generate split count:{} for this file:{}", 
splitCountAll, path);
+            return fileSourceSplits;
+        } catch (IOException e) {
+            String errorMsg = String.format("Create orc reader for this file 
[%s] failed", path);
+            throw new FileConnectorException(
+                    CommonErrorCodeDeprecated.READER_OPERATION_FAILED, 
errorMsg);
+        }
+    }
+
+    @Override
+    public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)

Review Comment:
   It would be best to do some refactoring, I found that the new read method 
has a lot in common with the old read method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to