dmitriikuzinepam commented on a change in pull request #15775:
URL: https://github.com/apache/beam/pull/15775#discussion_r738992972



##########
File path: sdks/python/apache_beam/io/textio.py
##########
@@ -262,33 +267,27 @@ def _find_separator_bounds(self, file_to_read, 
read_buffer):
       next_lf = read_buffer.data.find(delimiter, current_pos)
 
       if next_lf >= 0:
-        if self._delimiter is None and delimiter == b'\n' \
+        if self._delimiter is None \
                 and read_buffer.data[next_lf - 1:next_lf] == b'\r':
-          # Default delimiter
+          # Default b'\n' or user defined delimiter
           # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
         else:
           # User defined delimiter
           # Found a delimiter. Accepting that as the next separator.
           return (next_lf, next_lf + delimiter_len)
 
-      elif read_buffer.data.find(delimiter[0], current_pos) >= 0:
-        # Corner case: delimiter truncated at the end of the file
-        current_delimiter_pos = read_buffer.data.find(delimiter[0], 
current_pos)
-
-        i = 0
-        while i < len(delimiter) and read_buffer.data[current_delimiter_pos +
-                                                      i] == delimiter[i]:
-          i += 1
-          if not self._try_to_ensure_num_bytes_in_buffer(
-              file_to_read, read_buffer, current_delimiter_pos + i + 1):
-            break
-
-        if i == delimiter_len:
-          # All bytes of delimiter found
-          return current_delimiter_pos, current_delimiter_pos + delimiter_len
-
-        current_pos += i
+      elif self._delimiter is not None:
+        # Corner case: custom delimiter is truncated at the end of the buffer.
+        next_lf = read_buffer.data.find(

Review comment:
       No, the '\r' would not get appended to the input.
   I checked the case next code, read text without any transformation, only try 
to find extra symbol '\r'
   ```
   READ_BUFFER_SIZE = 8191
   # DEFAULT_READ_BUFFER_SIZE = 8192
   delimiter = b'\r\n'
   
   with NamedTemporaryFile("wb") as temp_file:
     temp_file.write(b'a' * READ_BUFFER_SIZE + delimiter + b'bbbbb')
     temp_file.flush()
     with beam.Pipeline() as pipeline:
       (
               pipeline
               | beam.io.ReadFromText(
   
                 file_pattern=temp_file.name,
                 delimiter=delimiter)
               | beam.Map(lambda x: print(x.find(delimiter.decode('utf-8')) >= 
0))
       )
   ```
   
   output:
   False
   False
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to