George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382756



##########
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##########
@@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, 
credential=None):
       credential: # type: Google credential object, if it is specified, the
       Http client will use it instead of the default one.
     """
-    self.credential = credential
     self.destination_dict = destination_dict
     # input_type pre-check
     if input_type not in ['bytes', 'fileio']:
       raise ValueError("input_type could only be 'bytes' or 'fileio'")
     self.input_type = input_type
+    self.buffer_size = buffer_size
+    self.max_workers = max_workers
+    self.credential = credential
 
   def expand(self, pcoll):
     return pcoll | beam.ParDo(
-        _StoreInstance(self.destination_dict, self.input_type, 
self.credential))
+        _StoreInstance(
+            self.destination_dict,
+            self.input_type,
+            self.buffer_size,
+            self.max_workers,
+            self.credential))
 
 
 class _StoreInstance(beam.DoFn):
   """A DoFn read or fetch dicom files then push it to a dicom store."""
-  def __init__(self, destination_dict, input_type, credential=None):
-    self.credential = credential
+  def __init__(
+      self,
+      destination_dict,
+      input_type,
+      buffer_size,
+      max_workers,
+      credential=None):

Review comment:
       Add custom client support

##########
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##########
@@ -164,70 +167,109 @@ class DicomSearch(PTransform):
     }
 
   """
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size=8, max_workers=5, credential=None):
     """Initializes DicomSearch.
     Args:
       credential: # type: Google credential object, if it is specified, the
       Http client will use it to create sessions instead of the default.
     """
+    self.buffer_size = buffer_size
+    self.max_workers = max_workers
     self.credential = credential
 
   def expand(self, pcoll):
-    return pcoll | beam.ParDo(_QidoSource(self.credential))
+    return pcoll | beam.ParDo(
+        _QidoSource(self.buffer_size, self.max_workers, self.credential))
 
 
 class _QidoSource(beam.DoFn):
   """A DoFn for executing every qido query request."""
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size, max_workers, credential=None):
+    self.buffer_size = buffer_size
+    self.max_workers = max_workers
     self.credential = credential
 
-  def process(self, element):
+  def start_bundle(self):
+    self.buffer = []
+
+  def finish_bundle(self):
+    return self._flush()
+
+  def validate_element(self, element):
     # Check if all required keys present.
     required_keys = [
         'project_id', 'region', 'dataset_id', 'dicom_store_id', 'search_type'
     ]
 
-    error_message = None
-
     for key in required_keys:
       if key not in element:
         error_message = 'Must have %s in the dict.' % (key)
-        break
-
-    if not error_message:
-      project_id = element['project_id']
-      region = element['region']
-      dataset_id = element['dataset_id']
-      dicom_store_id = element['dicom_store_id']
-      search_type = element['search_type']
-      params = element['params'] if 'params' in element else None
-
-      # Call qido search http client
-      if element['search_type'] in ['instances', "studies", "series"]:
-        result, status_code = DicomApiHttpClient().qido_search(
-          project_id, region, dataset_id, dicom_store_id,
-          search_type, params, self.credential
-        )
-      else:
-        error_message = (
-            'Search type can only be "studies", '
-            '"instances" or "series"')
-
-      if not error_message:
-        out = {}
-        out['result'] = result
-        out['status'] = status_code
-        out['input'] = element
-        out['success'] = (status_code == 200)
-        return [out]
-
-    # Return this when the input dict dose not meet the requirements
+        return False, error_message
+
+    # Check if return type is correct.
+    if element['search_type'] in ['instances', "studies", "series"]:
+      return True, None
+    else:
+      error_message = (
+          'Search type can only be "studies", '
+          '"instances" or "series"')
+      return False, error_message
+
+  def process(self, element, window=beam.DoFn.WindowParam):
+    # Check if the element is valid
+    valid, error_message = self.validate_element(element)
+
+    if valid:
+      self.buffer.append((element, window))
+      if len(self.buffer) >= self.buffer_size:
+        self._flush()

Review comment:
       Fixed and test added

##########
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##########
@@ -164,70 +167,109 @@ class DicomSearch(PTransform):
     }
 
   """
-  def __init__(self, credential=None):
+  def __init__(self, buffer_size=8, max_workers=5, credential=None):
     """Initializes DicomSearch.
     Args:
       credential: # type: Google credential object, if it is specified, the
       Http client will use it to create sessions instead of the default.
     """
+    self.buffer_size = buffer_size
+    self.max_workers = max_workers
     self.credential = credential
 
   def expand(self, pcoll):
-    return pcoll | beam.ParDo(_QidoSource(self.credential))
+    return pcoll | beam.ParDo(
+        _QidoSource(self.buffer_size, self.max_workers, self.credential))

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to