sohurdc commented on code in PR #8507:
URL: https://github.com/apache/seatunnel/pull/8507#discussion_r1918369660


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
 public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
+    private int batchReadRows = 1024;
+
+    /** user can specified row count per split */
+    private long rowCountPerSplitByUser = 0;
+
+    private final long DEFAULT_ROW_COUNT = 100000;
+    private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
+    private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;
+    private boolean whetherSplitFile = 
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+    @Override
+    public void setPluginConfig(Config pluginConfig) {
+        super.setPluginConfig(pluginConfig);
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+            whetherSplitFile =
+                    
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) {
+            rowCountPerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) {
+            fileSizePerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key());
+        }
+    }
+
+    /**
+     * split a file into many splits: <br>
+     * good: <br>
+     * 1. lower memory occupy. split read end, the memory can recycle. <br>
+     * 2. lower checkpoint ack delay <br>
+     * 3. support fine-grained concurrency. <br>
+     * bad: <br>
+     * 1. cannot guarantee the order of the data. <br>
+     *
+     * @param path file path
+     * @return FileSourceSplit set
+     */
+    @Override
+    public Set<FileSourceSplit> getFileSourceSplits(String path) {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            String errorMsg =
+                    String.format(
+                            "This file [%s] is not a orc file, please check 
the format of this file",
+                            path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
+        }
+        Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+        if (!whetherSplitFile) {
+            fileSourceSplits.add(new FileSourceSplit(path));
+            return fileSourceSplits;
+        }
+        try (Reader reader =
+                hadoopFileSystemProxy.doWithHadoopAuth(
+                        ((configuration, userGroupInformation) -> {
+                            OrcFile.ReaderOptions readerOptions =
+                                    OrcFile.readerOptions(configuration);
+                            return OrcFile.createReader(new Path(path), 
readerOptions);
+                        }))) {
+            log.info(
+                    "path:{}, rowCountPerSplitByUser:{}, 
fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}",
+                    path,
+                    rowCountPerSplitByUser,
+                    fileSizePerSplitByUser,
+                    reader.getContentLength(),
+                    reader.getStripes().size(),
+                    reader.getNumberOfRows());
+            long rowCountPerSplit = rowCountPerSplitByUser;
+            if (rowCountPerSplit <= 0) {
+                // auto split by file size
+                long fileSize = reader.getContentLength();
+                long rowCount = reader.getNumberOfRows();
+                long splitCount =
+                        fileSize
+                                / (fileSizePerSplitByUser <= 0
+                                        ? DEFAULT_FILE_SIZE_PER_SPLIT
+                                        : fileSizePerSplitByUser);
+                if (rowCount == 0 || splitCount == 0) {
+                    log.warn("cannot get file size or row count for orc 
file:{}", path);
+                    rowCountPerSplit = DEFAULT_ROW_COUNT;
+                } else {
+                    rowCountPerSplit = rowCount / splitCount;
+                }
+            }
+            long low = 0;
+            long high = low;
+            int splitCountAll = 0;
+            if (reader.getStripes() == null) {
+                log.warn("cannot get stripes for orc file:{}", path);
+                fileSourceSplits.add(new FileSourceSplit(path));
+                return fileSourceSplits;
+            }
+            for (int i = 0; i < reader.getStripes().size(); i++) {
+                StripeInformation stripe = reader.getStripes().get(i);
+                long leftOverCount = stripe.getNumberOfRows();
+                int splitCount4Strip = 0;
+                long startRow = low;
+                while (leftOverCount > 0) {
+                    if (leftOverCount > rowCountPerSplit) {
+                        high = low + rowCountPerSplit;
+                    } else {
+                        high = low + leftOverCount;
+                    }
+                    FileSourceSplit split = new FileSourceSplit(path, low, 
high - 1);
+                    fileSourceSplits.add(split);
+                    leftOverCount = leftOverCount - rowCountPerSplit;
+                    low = high;
+                    splitCountAll++;
+                    splitCount4Strip++;
+                }
+                log.debug(
+                        "generate split count:{} for this strip:{}, 
startRow:{}, endRow:{},",
+                        splitCount4Strip,
+                        i,
+                        startRow,
+                        high - 1);
+            }
+            log.info("generate split count:{} for this file:{}", 
splitCountAll, path);
+            return fileSourceSplits;
+        } catch (IOException e) {
+            String errorMsg = String.format("Create orc reader for this file 
[%s] failed", path);
+            throw new FileConnectorException(
+                    CommonErrorCodeDeprecated.READER_OPERATION_FAILED, 
errorMsg);
+        }
+    }
+
+    @Override
+    public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to