nikie commented on a change in pull request #15775:
URL: https://github.com/apache/beam/pull/15775#discussion_r735008085



##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +259,37 @@ def _find_separator_bounds(self, file_to_read, 
read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter == b'\n' and read_buffer.data[next_lf -
-                                                         1:next_lf] == b'\r':
+        if self._delimiter is None and delimiter == b'\n' \
+                and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+          # Default delimiter

Review comment:
       `# Default b'\n' or user defined delimiter.`

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -257,17 +259,37 @@ def _find_separator_bounds(self, file_to_read, 
read_buffer):
 
       # Using find() here is more efficient than a linear scan
       # of the byte array.
-      next_lf = read_buffer.data.find(self._delimiter, current_pos)
+      next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter == b'\n' and read_buffer.data[next_lf -
-                                                         1:next_lf] == b'\r':
+        if self._delimiter is None and delimiter == b'\n' \
+                and read_buffer.data[next_lf - 1:next_lf] == b'\r':
+          # Default delimiter
           # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
         else:
+          # User defined delimiter
           # Found a delimiter. Accepting that as the next separator.
           return (next_lf, next_lf + delimiter_len)
 
+      elif read_buffer.data.find(delimiter[0], current_pos) >= 0:

Review comment:
       I am sorry, but this pipeline prints 1 instead of 3:
   ```
   from tempfile import NamedTemporaryFile
   import apache_beam as beam
   
   READ_BUFFER_SIZE = 8192
   
   with NamedTemporaryFile("wb") as temp_file:
     temp_file.write(b'\r' + b'a' * (READ_BUFFER_SIZE - 2) + b'\r\n')
     temp_file.write(b'\r' + b'a' * (READ_BUFFER_SIZE - 3) + b'\r\n')
     temp_file.write(b'\r' + b'a' * (READ_BUFFER_SIZE - 4) + b'\r\n')
     temp_file.flush()
     with beam.Pipeline() as pipeline:
       (
         pipeline
         | beam.io.ReadFromText(
           file_pattern=temp_file.name,
           delimiter=b'\r\n')
         | beam.combiners.Count.Globally()
         | beam.Map(print)
       )
   ```
   This is because we look up the start of delimiter from the current position 
and give up if that is not a true start.
   
   Other issues:
   * here `if self._delimiter is None and delimiter == b'\n'` the second 
condition is redundant, since if the first true, the second is also true
   * the start of delimiter is looked up 2 times  in the `elif` and also for 
the case of the default (not custom) delimiter which is unnecessary
   * this assignment `current_pos += i` is not used, since the last method's 
line overwrites it: `current_pos = len(read_buffer.data)`
   
   Try the below code. I am not 100% sure yet if it does not have its own edge 
cases, so please review and test carefully.
   
   the beginning of the method's loop changes to:
   ```
       while True:
         if current_pos >= len(read_buffer.data) - delimiter_len + 1:
   ```
   ...
   the fixed redundant condition:
   ```
           if self._delimiter is None and read_buffer.data[next_lf -
                                                           1:next_lf] == b'\r':
   ```
   ...
   Complete elif condition instead of yours until the last method's line 
(unchanged):
   ```
         elif self._delimiter is not None:
           # Corner case: custom delimiter is truncated at the end of the 
buffer.
           next_lf = read_buffer.data.find(
               delimiter[0], len(read_buffer.data) - delimiter_len + 1)
           if next_lf >= 0:
             # The first possible start of a truncated delimiter,
             # but delimiters longer than 2 bytes may be truncated further.
             # Delegating full matching to the next iteration to avoid
             # additional loop here.
             current_pos = next_lf
             continue
   
         current_pos = len(read_buffer.data)
   ```
   This implementation leverages the fact that in the `elif` we already know 
that there are no full delimiters in the buffer, so we start searching at the 
edge less delimiter length + 1, where a partial delimiter may sit.
   Also, the same existing `_try_to_ensure_num_bytes_in_buffer` is used to 
fetch for possible delimiter continuation, and the same `read_buffer.data.find` 
checks for the truncated delimiter and continues scanning if that was a false 
positive.
   
   The following case of a false positive delimiter start at the edge is also 
covered
   (the same test pipeline as above, but different lines and delimiter, should 
print 2):
   ```
     temp_file.write(b'a' * (READ_BUFFER_SIZE - 3) + b'#a' + b'####')
     temp_file.write(b'aaa')
   
     ...
     delimiter=b'####')
     ...
   ```
   

##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -245,7 +245,9 @@ def _find_separator_bounds(self, file_to_read, read_buffer):
 
     current_pos = read_buffer.position

Review comment:
       ```
       # Use the custom delimiter to be used in place of
       # the default ones ('\r', '\n' or '\r\n')'
   ```
   Unlike Java, Python SDK does not support `\r` delimiter in default mode, 
please update comment above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to