George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382681



##########
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##########
@@ -426,6 +485,44 @@ def process(self, element):
 
     out = {}
     out['status'] = status_code
-    out['input'] = None if status_code == 200 else element
+    out['input'] = None if status_code == 200 else dicom_file
     out['success'] = (status_code == 200)
-    return [out]
+    return out
+
+  def read_dicom_file(self, buffer_element):
+    # Read the file based on different input. If the read fails ,return
+    # an error dict which records input and error messages.
+    try:
+      if self.input_type == 'fileio':
+        f = buffer_element.open()
+        return True, f.read()
+      else:
+        return True, buffer_element
+    except Exception as error_message:
+      error_out = {}
+      error_out['status'] = error_message
+      error_out['input'] = buffer_element
+      error_out['success'] = False
+      return False, error_out
+
+  def process_buffer_element(self, buffer_element):
+    # Thread job runner - each thread stores a DICOM file
+    success, read_result = self.read_dicom_file(buffer_element[0])
+    windows = [buffer_element[1]]
+    value = None
+    if success:
+      value = self.make_request(read_result)
+    else:
+      value = read_result
+    return beam.utils.windowed_value.WindowedValue(
+        value=value, timestamp=0, windows=windows)

Review comment:
       Added timestamps




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to