[ 
https://issues.apache.org/jira/browse/BEAM-10601?focusedWorklogId=464845&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-464845
 ]

ASF GitHub Bot logged work on BEAM-10601:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/20 03:10
            Start Date: 31/Jul/20 03:10
    Worklog Time Spent: 10m 
      Work Description: George-Wu commented on a change in pull request #12331:
URL: https://github.com/apache/beam/pull/12331#discussion_r463382633



##########
File path: sdks/python/apache_beam/io/gcp/dicomio.py
##########
@@ -372,52 +420,63 @@ def __init__(self, destination_dict, input_type, 
credential=None):
       credential: # type: Google credential object, if it is specified, the
       Http client will use it instead of the default one.
     """
-    self.credential = credential
     self.destination_dict = destination_dict
     # input_type pre-check
     if input_type not in ['bytes', 'fileio']:
       raise ValueError("input_type could only be 'bytes' or 'fileio'")
     self.input_type = input_type
+    self.buffer_size = buffer_size
+    self.max_workers = max_workers
+    self.credential = credential
 
   def expand(self, pcoll):
     return pcoll | beam.ParDo(
-        _StoreInstance(self.destination_dict, self.input_type, 
self.credential))
+        _StoreInstance(
+            self.destination_dict,
+            self.input_type,
+            self.buffer_size,
+            self.max_workers,
+            self.credential))
 
 
 class _StoreInstance(beam.DoFn):
   """A DoFn read or fetch dicom files then push it to a dicom store."""
-  def __init__(self, destination_dict, input_type, credential=None):
-    self.credential = credential
+  def __init__(
+      self,
+      destination_dict,
+      input_type,
+      buffer_size,
+      max_workers,
+      credential=None):
     # pre-check destination dict
     required_keys = ['project_id', 'region', 'dataset_id', 'dicom_store_id']
     for key in required_keys:
       if key not in destination_dict:
         raise ValueError('Must have %s in the dict.' % (key))
     self.destination_dict = destination_dict
     self.input_type = input_type
+    self.buffer_size = buffer_size
+    self.max_workers = max_workers
+    self.credential = credential
 
-  def process(self, element):
+  def start_bundle(self):
+    self.buffer = []
+
+  def finish_bundle(self):
+    return self._flush()
+
+  def process(self, element, window=beam.DoFn.WindowParam):
+    self.buffer.append((element, window))
+    if len(self.buffer) >= self.buffer_size:
+      self._flush()

Review comment:
       Good catch, fixed and tests added!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 464845)
    Time Spent: 1h 10m  (was: 1h)

> DICOM API Beam IO connector
> ---------------------------
>
>                 Key: BEAM-10601
>                 URL: https://issues.apache.org/jira/browse/BEAM-10601
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-py-gcp
>            Reporter: Jiahao Wu
>            Assignee: Jiahao Wu
>            Priority: P2
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Create a new Apache Beam I/O connector that helps customers facilitate the 
> reading and writing data to the DICOM Healthcare API, it has three components:
>  # An Ptransform that takes QIDO request and output result metadata as 
> pcollection.
>  # An I/O sink that takes DICOM files and writes them to DICOM store via API.
>  # An Ptransform that convert pubsub message to Qido search request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to