nikie commented on a change in pull request #15775:
URL: https://github.com/apache/beam/pull/15775#discussion_r735008085
##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +259,37 @@ def _find_separator_bounds(self, file_to_read,
read_buffer):
# Using find() here is more efficient than a linear scan
# of the byte array.
- next_lf = read_buffer.data.find(self._delimiter, current_pos)
+ next_lf = read_buffer.data.find(delimiter, current_pos)
if next_lf >= 0:
- if self._delimiter == b'\n' and read_buffer.data[next_lf -
- 1:next_lf] == b'\r':
+ if self._delimiter is None and delimiter == b'\n' \
+ and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+ # Default delimiter
Review comment:
`# Default b'\n' or user defined delimiter.`
##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +259,37 @@ def _find_separator_bounds(self, file_to_read,
read_buffer):
# Using find() here is more efficient than a linear scan
# of the byte array.
- next_lf = read_buffer.data.find(self._delimiter, current_pos)
+ next_lf = read_buffer.data.find(delimiter, current_pos)
if next_lf >= 0:
- if self._delimiter == b'\n' and read_buffer.data[next_lf -
- 1:next_lf] == b'\r':
+ if self._delimiter is None and delimiter == b'\n' \
+ and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+ # Default delimiter
# Found a '\r\n'. Accepting that as the next separator.
return (next_lf - 1, next_lf + 1)
else:
+ # User defined delimiter
# Found a delimiter. Accepting that as the next separator.
return (next_lf, next_lf + delimiter_len)
+ elif read_buffer.data.find(delimiter[0], current_pos) >= 0:
Review comment:
I am sorry, but this pipeline prints 1 instead of 3:
```
from tempfile import NamedTemporaryFile
import apache_beam as beam
READ_BUFFER_SIZE = 8192
with NamedTemporaryFile("wb") as temp_file:
temp_file.write(b'\r' + b'a' * (READ_BUFFER_SIZE - 2) + b'\r\n')
temp_file.write(b'\r' + b'a' * (READ_BUFFER_SIZE - 3) + b'\r\n')
temp_file.write(b'\r' + b'a' * (READ_BUFFER_SIZE - 4) + b'\r\n')
temp_file.flush()
with beam.Pipeline() as pipeline:
(
pipeline
| beam.io.ReadFromText(
file_pattern=temp_file.name,
delimiter=b'\r\n')
| beam.combiners.Count.Globally()
| beam.Map(print)
)
```
This is because we look up the start of delimiter from the current position
and give up if that is not a true start.
Other issues:
* here `if self._delimiter is None and delimiter == b'\n'` the second
condition is redundant, since if the first true, the second is also true
* the start of delimiter is looked up 2 times in the `elif` and also for
the case of the default (not custom) delimiter which is unnecessary
* this assignment `current_pos += i` is not used, since the last method's
line overwrites it: `current_pos = len(read_buffer.data)`
Try the below code. I am not 100% sure yet if it does not have its own edge
cases, so please review and test carefully.
the beginning of the method's loop changes to:
```
while True:
if current_pos >= len(read_buffer.data) - delimiter_len + 1:
```
...
the fixed redundant condition:
```
if self._delimiter is None and read_buffer.data[next_lf -
1:next_lf] == b'\r':
```
...
Complete elif condition instead of yours until the last method's line
(unchanged):
```
elif self._delimiter is not None:
# Corner case: custom delimiter is truncated at the end of the
buffer.
next_lf = read_buffer.data.find(
delimiter[0], len(read_buffer.data) - delimiter_len + 1)
if next_lf >= 0:
# The first possible start of a truncated delimiter,
# but delimiters longer than 2 bytes may be truncated further.
# Delegating full matching to the next iteration to avoid
# additional loop here.
current_pos = next_lf
continue
current_pos = len(read_buffer.data)
```
This implementation leverages the fact that in the `elif` we already know
that there are no full delimiters in the buffer, so we start searching at the
edge less delimiter length + 1, where a partial delimiter may sit.
Also, the same existing `_try_to_ensure_num_bytes_in_buffer` is used to
fetch for possible delimiter continuation, and the same `read_buffer.data.find`
checks for the truncated delimiter and continues scanning if that was a false
positive.
The following case of a false positive delimiter start at the edge is also
covered
(the same test pipeline as above, but different lines and delimiter, should
print 2):
```
temp_file.write(b'a' * (READ_BUFFER_SIZE - 3) + b'#a' + b'####')
temp_file.write(b'aaa')
...
delimiter=b'####')
...
```
##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -245,7 +245,9 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
current_pos = read_buffer.position
Review comment:
```
# Use the custom delimiter to be used in place of
# the default ones ('\r', '\n' or '\r\n')'
```
Unlike Java, Python SDK does not support `\r` delimiter in default mode,
please update comment above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]