This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 64cdd4d1f [Feature] Enhanced FileSource to support more parameters
feature (#4838)
64cdd4d1f is described below
commit 64cdd4d1fa0a6f22c76a7c7ec8d9c4902438e486
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Wed Aug 9 21:58:07 2023 +0800
[Feature] Enhanced FileSource to support more parameters feature (#4838)
* Enhanced FileSource to support more parameters feature
* Delete param of `limitTotalLine` and it can be replaced by method of
`fileInfo`
* Remove redundant references
---
.../linkis/storage/source/AbstractFileSource.java | 13 +++++++
.../apache/linkis/storage/source/FileSource.java | 4 ++
.../apache/linkis/storage/source/FileSplit.java | 45 +++++++++++++++++++++-
.../filesystem/conf/WorkSpaceConfiguration.java | 7 ++++
.../filesystem/restful/api/FsRestfulApi.java | 12 +++++-
5 files changed, 79 insertions(+), 2 deletions(-)
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
index f2eb65ed1..fc4e615b3 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
@@ -108,4 +108,17 @@ public abstract class AbstractFileSource implements
FileSource {
.map(fileSplit -> fileSplit.getFileInfo(needToCountRowNumber))
.toArray(Pair[]::new);
}
+
+ @Override
+ public FileSource limitBytes(Long limitBytes) {
+ Arrays.stream(fileSplits).forEach(fileSplit ->
fileSplit.setLimitBytes(limitBytes));
+ return this;
+ }
+
+ @Override
+ public FileSource limitColumnLength(int limitColumnLength) {
+ Arrays.stream(fileSplits)
+ .forEach(fileSplit ->
fileSplit.setLimitColumnLength(limitColumnLength));
+ return this;
+ }
}
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
index b7bcc8c84..cee72dfcd 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
@@ -80,6 +80,10 @@ public interface FileSource extends Closeable {
return isResultSet(fsPath.getPath());
}
+ FileSource limitBytes(Long limitBytes);
+
+ FileSource limitColumnLength(int limitColumnLength);
+
/**
* Currently only supports table multi-result sets
*
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
index a43b7feb0..3a6c05a54 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
@@ -41,6 +41,8 @@ import org.apache.commons.math3.util.Pair;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -60,6 +62,8 @@ public class FileSplit implements Closeable {
protected Function<Record, Record> shuffler;
private boolean pageTrigger = false;
protected Map<String, String> params = new HashMap<>();
+ private long limitBytes = 0L;
+ private int limitColumnLength = 0;
public FileSplit(FsReader<? extends MetaData, ? extends Record> fsReader) {
this.fsReader = fsReader;
@@ -98,6 +102,14 @@ public class FileSplit implements Closeable {
return totalLine;
}
+ public void setLimitBytes(long limitBytes) {
+ this.limitBytes = limitBytes;
+ }
+
+ public void setLimitColumnLength(int limitColumnLength) {
+ this.limitColumnLength = limitColumnLength;
+ }
+
public <M> M whileLoop(Function<MetaData, M> metaDataFunction,
Consumer<Record> recordConsumer) {
M m = null;
try {
@@ -222,16 +234,47 @@ public class FileSplit implements Closeable {
public Pair<Object, List<String[]>> collect() {
List<String[]> recordList = new ArrayList<>();
+ final AtomicLong tmpBytes = new AtomicLong(0L);
+ final AtomicBoolean overFlag = new AtomicBoolean(false);
Object metaData =
whileLoop(
collectMetaData -> collectMetaData(collectMetaData),
- r -> recordList.add(collectRecord(r)));
+ r -> {
+ if (!overFlag.get()) {
+ String[] arr = collectRecord(r);
+ if (limitBytes > 0) {
+ for (int i = 0; i < arr.length; i++) {
+ tmpBytes.addAndGet(arr[i].getBytes().length);
+ if (overFlag.get() || tmpBytes.get() > limitBytes) {
+ overFlag.set(true);
+ arr[i] = "";
+ }
+ }
+ recordList.add(arr);
+ } else {
+ recordList.add(arr);
+ }
+ }
+ });
return new Pair<>(metaData, recordList);
}
public String[] collectRecord(Record record) {
if (record instanceof TableRecord) {
TableRecord tableRecord = (TableRecord) record;
+ if (limitColumnLength > 0) {
+ return Arrays.stream(tableRecord.row)
+ .map(
+ obj -> {
+ String col = DataType.valueToString(obj);
+ if (col.length() > limitColumnLength) {
+ return col.substring(0, limitColumnLength);
+ } else {
+ return col;
+ }
+ })
+ .toArray(String[]::new);
+ }
return
Arrays.stream(tableRecord.row).map(DataType::valueToString).toArray(String[]::new);
} else if (record instanceof LineRecord) {
LineRecord lineRecord = (LineRecord) record;
diff --git
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
index b3eeba135..f7ea5086c 100644
---
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
+++
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
@@ -61,6 +61,13 @@ public class WorkSpaceConfiguration {
public static final CommonVars<Boolean> ENABLE_USER_GROUP =
CommonVars$.MODULE$.apply("linkis.os.user.group.enabled", true);
+ // default 63M
+ public static final CommonVars<Long> FILESYSTEM_LIMIT_BYTES =
+ CommonVars$.MODULE$.apply("linkis.filesystem.limit.bytes", 66060288L);
+
+ public static final CommonVars<Integer> FILESYSTEM_LIMIT_COLUMN_LENGTH =
+ CommonVars$.MODULE$.apply("linkis.filesystem.limit.column.length", 2000);
+
public static final ExecutorService executorService =
new ThreadPoolExecutor(
FILESYSTEM_FS_THREAD_NUM.getValue(),
diff --git
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
index 5a7316d15..ae7e19106 100644
---
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
+++
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
@@ -560,7 +560,9 @@ public class FsRestfulApi {
@RequestParam(value = "path", required = false) String path,
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "pageSize", defaultValue = "5000") Integer
pageSize,
- @RequestParam(value = "charset", defaultValue = "utf-8") String charset)
+ @RequestParam(value = "charset", defaultValue = "utf-8") String charset,
+ @RequestParam(value = "limitBytes", defaultValue = "0") Long limitBytes,
+ @RequestParam(value = "limitColumnLength", defaultValue = "0") Integer
limitColumnLength)
throws IOException, WorkSpaceException {
Message message = Message.ok();
@@ -583,6 +585,14 @@ public class FsRestfulApi {
if (FileSource.isResultSet(fsPath.getPath())) {
fileSource = fileSource.page(page, pageSize);
}
+ if (limitBytes > 0) {
+ fileSource = fileSource.limitBytes(Math.min(limitBytes,
FILESYSTEM_LIMIT_BYTES.getValue()));
+ }
+ if (limitColumnLength > 0) {
+ fileSource =
+ fileSource.limitColumnLength(
+ Math.min(limitColumnLength,
FILESYSTEM_LIMIT_COLUMN_LENGTH.getValue()));
+ }
Pair<Object, List<String[]>> result = fileSource.collect()[0];
IOUtils.closeQuietly(fileSource);
message.data("metadata", result.getFirst()).data("fileContent",
result.getSecond());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]