Hisoka-X commented on code in PR #8507:
URL: https://github.com/apache/seatunnel/pull/8507#discussion_r1915904441
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;
+ private int batchReadRows = 1024;
Review Comment:
```suggestion
private static final int BATCH_READ_ROWS = 1024;
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;
+ private int batchReadRows = 1024;
+
+ /** user can specified row count per split */
+ private long rowCountPerSplitByUser = 0;
+
+ private final long DEFAULT_ROW_COUNT = 100000;
+ private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
+ private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;
+ private boolean whetherSplitFile =
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+ @Override
+ public void setPluginConfig(Config pluginConfig) {
+ super.setPluginConfig(pluginConfig);
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+ whetherSplitFile =
+
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+ }
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) {
+ rowCountPerSplitByUser =
+
pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key());
+ }
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) {
+ fileSizePerSplitByUser =
+
pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key());
+ }
+ }
+
+ /**
+ * split a file into many splits: <br>
+ * good: <br>
+ * 1. lower memory occupy. split read end, the memory can recycle. <br>
+ * 2. lower checkpoint ack delay <br>
+ * 3. support fine-grained concurrency. <br>
+ * bad: <br>
+ * 1. cannot guarantee the order of the data. <br>
+ *
+ * @param path file path
+ * @return FileSourceSplit set
+ */
+ @Override
+ public Set<FileSourceSplit> getFileSourceSplits(String path) {
+ if (Boolean.FALSE.equals(checkFileType(path))) {
+ String errorMsg =
+ String.format(
+ "This file [%s] is not a orc file, please check
the format of this file",
+ path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
+ }
+ Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+ if (!whetherSplitFile) {
+ fileSourceSplits.add(new FileSourceSplit(path));
+ return fileSourceSplits;
+ }
+ try (Reader reader =
+ hadoopFileSystemProxy.doWithHadoopAuth(
+ ((configuration, userGroupInformation) -> {
+ OrcFile.ReaderOptions readerOptions =
+ OrcFile.readerOptions(configuration);
+ return OrcFile.createReader(new Path(path),
readerOptions);
+ }))) {
+ log.info(
+ "path:{}, rowCountPerSplitByUser:{},
fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}",
+ path,
+ rowCountPerSplitByUser,
+ fileSizePerSplitByUser,
+ reader.getContentLength(),
+ reader.getStripes().size(),
+ reader.getNumberOfRows());
+ long rowCountPerSplit = rowCountPerSplitByUser;
+ if (rowCountPerSplit <= 0) {
+ // auto split by file size
+ long fileSize = reader.getContentLength();
+ long rowCount = reader.getNumberOfRows();
+ long splitCount =
+ fileSize
+ / (fileSizePerSplitByUser <= 0
+ ? DEFAULT_FILE_SIZE_PER_SPLIT
+ : fileSizePerSplitByUser);
+ if (rowCount == 0 || splitCount == 0) {
+ log.warn("cannot get file size or row count for orc
file:{}", path);
+ rowCountPerSplit = DEFAULT_ROW_COUNT;
+ } else {
+ rowCountPerSplit = rowCount / splitCount;
+ }
+ }
+ long low = 0;
+ long high = low;
+ int splitCountAll = 0;
+ if (reader.getStripes() == null) {
+ log.warn("cannot get stripes for orc file:{}", path);
+ fileSourceSplits.add(new FileSourceSplit(path));
+ return fileSourceSplits;
+ }
+ for (int i = 0; i < reader.getStripes().size(); i++) {
+ StripeInformation stripe = reader.getStripes().get(i);
+ long leftOverCount = stripe.getNumberOfRows();
+ int splitCount4Strip = 0;
+ long startRow = low;
+ while (leftOverCount > 0) {
+ if (leftOverCount > rowCountPerSplit) {
+ high = low + rowCountPerSplit;
+ } else {
+ high = low + leftOverCount;
+ }
+ FileSourceSplit split = new FileSourceSplit(path, low,
high - 1);
+ fileSourceSplits.add(split);
+ leftOverCount = leftOverCount - rowCountPerSplit;
+ low = high;
+ splitCountAll++;
+ splitCount4Strip++;
+ }
+ log.debug(
+ "generate split count:{} for this strip:{},
startRow:{}, endRow:{},",
+ splitCount4Strip,
+ i,
+ startRow,
+ high - 1);
+ }
+ log.info("generate split count:{} for this file:{}",
splitCountAll, path);
+ return fileSourceSplits;
+ } catch (IOException e) {
+ String errorMsg = String.format("Create orc reader for this file
[%s] failed", path);
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
errorMsg);
+ }
+ }
+
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
+ throws IOException, FileConnectorException {
+ String path = split.getFilePath();
+ String tableId = split.getTableId();
+ if (split.getMinRowIndex() == null || split.getMaxRowIndex() == null) {
+ log.warn(
Review Comment:
```suggestion
log.debug(
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java:
##########
@@ -82,6 +89,127 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
private int[] indexes;
+ private boolean whetherSplitFile =
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+ @Override
+ public void setPluginConfig(Config pluginConfig) {
+ super.setPluginConfig(pluginConfig);
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+ whetherSplitFile =
+
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+ }
+ }
+
+ /**
+ * parquet file can only split by file block now. <br>
+ * user cannot customise the split count and size. <br>
+ *
+ * @param path file path
+ * @return splits
+ */
+ @Override
+ public Set<FileSourceSplit> getFileSourceSplits(String path) {
+ Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+ if (!whetherSplitFile) {
+ fileSourceSplits.add(new FileSourceSplit(path));
+ return fileSourceSplits;
+ }
+ ParquetMetadata metadata;
+ try (ParquetFileReader reader =
+ hadoopFileSystemProxy.doWithHadoopAuth(
+ ((configuration, userGroupInformation) -> {
+ HadoopInputFile hadoopInputFile =
+ HadoopInputFile.fromPath(new Path(path),
configuration);
+ return ParquetFileReader.open(hadoopInputFile);
+ }))) {
+ metadata = reader.getFooter();
+ } catch (IOException e) {
+ String errorMsg =
+ String.format("Create parquet reader for this file [%s]
failed", path);
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
errorMsg, e);
+ }
+ if (metadata == null || CollectionUtils.isEmpty(metadata.getBlocks()))
{
+ log.warn("cannot get meta or blocks for path:{}", path);
+ fileSourceSplits.add(new FileSourceSplit(path));
+ return fileSourceSplits;
+ }
+
+ long low = 0;
+ long high = low;
+ long splitCountAll = 0;
+ for (int i = 0; i < metadata.getBlocks().size(); i++) {
+ high = low + metadata.getBlocks().get(i).getCompressedSize();
+ FileSourceSplit split = new FileSourceSplit(path, low, high);
+ fileSourceSplits.add(split);
+ low = high;
+ }
+ log.info("generate parquet split count:{} for this file:{}",
splitCountAll, path);
+ return fileSourceSplits;
+ }
+
+ /**
+ * todo: <br>
+ * In theory, batch column reading can improve reading performance.
+ */
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
Review Comment:
ditto
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java:
##########
@@ -178,4 +178,23 @@ public class BaseSourceConfigOptions {
.enumType(ArchiveCompressFormat.class)
.defaultValue(ArchiveCompressFormat.NONE)
.withDescription("Archive compression codec");
+
+ public static final Option<Long> FILE_SIZE_PER_SPLIT =
+ Options.key("file_size_per_split")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "split a file into many splits according to file
size, if row_count_per_split not config. use row_count_per_split prefer.");
+
+ public static final Option<Long> ROW_COUNT_PER_SPLIT =
+ Options.key("row_count_per_split")
+ .longType()
+ .noDefaultValue()
+ .withDescription("split a file into many splits according
to row count");
+
+ public static final Option<Boolean> WHETHER_SPLIT_FILE =
+ Options.key("whether_split_file")
Review Comment:
```suggestion
Options.key("split_single_file_to_multiple_splits")
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,225 @@
public class OrcReadStrategy extends AbstractReadStrategy {
private static final long MIN_SIZE = 16 * 1024;
+ private int batchReadRows = 1024;
+
+ /** user can specified row count per split */
+ private long rowCountPerSplitByUser = 0;
+
+ private final long DEFAULT_ROW_COUNT = 100000;
+ private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
+ private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;
+ private boolean whetherSplitFile =
BaseSourceConfigOptions.WHETHER_SPLIT_FILE.defaultValue();
+
+ @Override
+ public void setPluginConfig(Config pluginConfig) {
+ super.setPluginConfig(pluginConfig);
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key())) {
+ whetherSplitFile =
+
pluginConfig.getBoolean(BaseSourceConfigOptions.WHETHER_SPLIT_FILE.key());
+ }
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) {
+ rowCountPerSplitByUser =
+
pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key());
+ }
+ if
(pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) {
+ fileSizePerSplitByUser =
+
pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key());
+ }
+ }
+
+ /**
+ * split a file into many splits: <br>
+ * good: <br>
+ * 1. lower memory occupy. split read end, the memory can recycle. <br>
+ * 2. lower checkpoint ack delay <br>
+ * 3. support fine-grained concurrency. <br>
+ * bad: <br>
+ * 1. cannot guarantee the order of the data. <br>
+ *
+ * @param path file path
+ * @return FileSourceSplit set
+ */
+ @Override
+ public Set<FileSourceSplit> getFileSourceSplits(String path) {
+ if (Boolean.FALSE.equals(checkFileType(path))) {
+ String errorMsg =
+ String.format(
+ "This file [%s] is not a orc file, please check
the format of this file",
+ path);
+ throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
+ }
+ Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+ if (!whetherSplitFile) {
+ fileSourceSplits.add(new FileSourceSplit(path));
+ return fileSourceSplits;
+ }
+ try (Reader reader =
+ hadoopFileSystemProxy.doWithHadoopAuth(
+ ((configuration, userGroupInformation) -> {
+ OrcFile.ReaderOptions readerOptions =
+ OrcFile.readerOptions(configuration);
+ return OrcFile.createReader(new Path(path),
readerOptions);
+ }))) {
+ log.info(
+ "path:{}, rowCountPerSplitByUser:{},
fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}",
+ path,
+ rowCountPerSplitByUser,
+ fileSizePerSplitByUser,
+ reader.getContentLength(),
+ reader.getStripes().size(),
+ reader.getNumberOfRows());
+ long rowCountPerSplit = rowCountPerSplitByUser;
+ if (rowCountPerSplit <= 0) {
+ // auto split by file size
+ long fileSize = reader.getContentLength();
+ long rowCount = reader.getNumberOfRows();
+ long splitCount =
+ fileSize
+ / (fileSizePerSplitByUser <= 0
+ ? DEFAULT_FILE_SIZE_PER_SPLIT
+ : fileSizePerSplitByUser);
+ if (rowCount == 0 || splitCount == 0) {
+ log.warn("cannot get file size or row count for orc
file:{}", path);
+ rowCountPerSplit = DEFAULT_ROW_COUNT;
+ } else {
+ rowCountPerSplit = rowCount / splitCount;
+ }
+ }
+ long low = 0;
+ long high = low;
+ int splitCountAll = 0;
+ if (reader.getStripes() == null) {
+ log.warn("cannot get stripes for orc file:{}", path);
+ fileSourceSplits.add(new FileSourceSplit(path));
+ return fileSourceSplits;
+ }
+ for (int i = 0; i < reader.getStripes().size(); i++) {
+ StripeInformation stripe = reader.getStripes().get(i);
+ long leftOverCount = stripe.getNumberOfRows();
+ int splitCount4Strip = 0;
+ long startRow = low;
+ while (leftOverCount > 0) {
+ if (leftOverCount > rowCountPerSplit) {
+ high = low + rowCountPerSplit;
+ } else {
+ high = low + leftOverCount;
+ }
+ FileSourceSplit split = new FileSourceSplit(path, low,
high - 1);
+ fileSourceSplits.add(split);
+ leftOverCount = leftOverCount - rowCountPerSplit;
+ low = high;
+ splitCountAll++;
+ splitCount4Strip++;
+ }
+ log.debug(
+ "generate split count:{} for this strip:{},
startRow:{}, endRow:{},",
+ splitCount4Strip,
+ i,
+ startRow,
+ high - 1);
+ }
+ log.info("generate split count:{} for this file:{}",
splitCountAll, path);
+ return fileSourceSplits;
+ } catch (IOException e) {
+ String errorMsg = String.format("Create orc reader for this file
[%s] failed", path);
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
errorMsg);
+ }
+ }
+
+ @Override
+ public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
Review Comment:
It would be best to do some refactoring, I found that the new read method
has a lot in common with the old read method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]