This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-streamloader.git
The following commit(s) were added to refs/heads/master by this push:
new fcc6652 [Feature] Support for Custom Line Break Characters (#27)
fcc6652 is described below
commit fcc6652c7183b9b0171ff3673936bb333989fe92
Author: cfbber <[email protected]>
AuthorDate: Wed Feb 26 22:21:34 2025 +0800
[Feature] Support for Custom Line Break Characters (#27)
---
main.go | 38 +++++++++++++++++++++++++++++++++++---
reader/reader.go | 11 ++++++++---
2 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/main.go b/main.go
index ecff344..38d8799 100644
--- a/main.go
+++ b/main.go
@@ -22,6 +22,7 @@ import (
"fmt"
"math"
"os"
+ "regexp"
"strconv"
"strings"
"sync"
@@ -81,8 +82,8 @@ var (
retryInfo map[int]int
showVersion bool
queueSize int
-
- bufferPool = sync.Pool{
+ lineDelimiter byte = '\n'
+ bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, bufferSize)
},
@@ -191,6 +192,24 @@ func initFlags() {
utils.InitLog(logLevel)
}
+// Restore hex escape sequences like \xNN to their corresponding characters
+func restoreHexEscapes(s1 string) (string, error) {
+ if s1 == `\n` {
+ return "\n", nil
+ }
+
+ re := regexp.MustCompile(`\\x([0-9A-Fa-f]{2})`)
+
+ return re.ReplaceAllStringFunc(s1, func(match string) string {
+ hexValue := match[2:] // Remove the \x prefix
+ decValue, err := strconv.ParseInt(hexValue, 16, 0)
+ if err != nil {
+ return match
+ }
+ return string(rune(decValue))
+ }), nil
+}
+
//go:generate go run gen_version.go
func paramCheck() {
if showVersion {
@@ -253,6 +272,19 @@ func paramCheck() {
if strings.ToLower(kv[0]) == "format" &&
strings.ToLower(kv[1]) != "csv" {
enableConcurrency = false
}
+
+ if strings.ToLower(kv[0]) == "line_delimiter" {
+
+ restored, err := restoreHexEscapes(kv[1])
+ if err != nil || len(restored) != 1 {
+ log.Errorf("line_delimiter invalid:
%s", kv[1])
+ os.Exit(1)
+ } else {
+ lineDelimiter = restored[0]
+ }
+
+ }
+
if len(kv) > 2 {
headers[kv[0]] = strings.Join(kv[1:], ":")
} else {
@@ -369,7 +401,7 @@ func main() {
streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask,
&retryInfo)
reporter.Report()
defer reporter.CloseWait()
- reader.Read(reporter, workers, maxBytesPerTask, &retryInfo,
loadResp, retryCount)
+ reader.Read(reporter, workers, maxBytesPerTask, &retryInfo,
loadResp, retryCount, lineDelimiter)
reader.Close()
streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime)
diff --git a/reader/reader.go b/reader/reader.go
index 2dcd2a5..fa6563f 100644
--- a/reader/reader.go
+++ b/reader/reader.go
@@ -108,7 +108,8 @@ func NewFileReader(filePaths string, batchRows int,
batchBytes int, bufferSize i
}
// Read File
-func (f *FileReader) Read(reporter *report.Reporter, workers int,
maxBytesPerTask int, retryInfo *map[int]int, loadResp *loader.Resp, retryCount
int) {
+func (f *FileReader) Read(reporter *report.Reporter, workers int,
maxBytesPerTask int, retryInfo *map[int]int,
+ loadResp *loader.Resp, retryCount int, lineDelimiter byte) {
index := 0
data := f.pool.Get().([]byte)
count := f.batchRows
@@ -125,16 +126,20 @@ func (f *FileReader) Read(reporter *report.Reporter,
workers int, maxBytesPerTas
for _, file := range f.files {
loadResp.LoadFiles = append(loadResp.LoadFiles, file.Name())
reader := bufio.NewReaderSize(file, f.bufferSize)
+
for {
if atomic.LoadUint64(&reporter.FinishedWorkers) ==
atomic.LoadUint64(&reporter.TotalWorkers) {
return
}
- line, err := reader.ReadBytes('\n')
- if err == io.EOF {
+ line, err := reader.ReadBytes(lineDelimiter)
+ if err == io.EOF && len(line) == 0 {
file.Close()
break
} else if err != nil {
log.Errorf("Read file failed, error message:
%v, before retrying, we suggest:\n1.Check the input data files and fix if there
is any problem.\n2.Do select count(*) to check whether data is partially
loaded.\n3.If the data is partially loaded and duplication is unacceptable,
consider dropping the table (with caution that all data in the table will be
lost) and retry.\n4.Otherwise, just retry.\n", err)
+ if len(line) != 0 {
+ log.Error("5.When using a specified
line delimiter, the file must end with that delimiter.")
+ }
os.Exit(1)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]