rohdesamuel commented on a change in pull request #15490:
URL: https://github.com/apache/beam/pull/15490#discussion_r708427907



##########
File path: sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
##########
@@ -267,27 +284,65 @@ def _build_query_components(
   """
   if found:
     user_pipeline = next(iter(found.values())).pipeline
-    cache_manager = ie.current_env().get_cache_manager(user_pipeline)
-    instrumentation = inst.build_pipeline_instrument(user_pipeline)
     sql_pipeline = beam.Pipeline(options=user_pipeline._options)
     ie.current_env().add_derived_pipeline(user_pipeline, sql_pipeline)
     sql_source = {}
-    if instrumentation.has_unbounded_sources:
+    if has_source_to_cache(user_pipeline):
       sql_source = pcolls_from_streaming_cache(
-          user_pipeline, sql_pipeline, found, instrumentation, cache_manager)
+          user_pipeline, sql_pipeline, found)
     else:
+      cache_manager = ie.current_env().get_cache_manager(
+          user_pipeline, create_if_absent=True)
       for pcoll_name, pcoll in found.items():
-        cache_key = instrumentation.cache_key(pcoll)
+        cache_key = CacheKey.from_pcoll(pcoll_name, pcoll).to_str()
         sql_source[pcoll_name] = pcoll_from_file_cache(
             sql_pipeline, pcoll, cache_manager, cache_key)
     if len(sql_source) == 1:
       query = replace_single_pcoll_token(query, next(iter(sql_source.keys())))
       sql_source = next(iter(sql_source.values()))
   else:
     sql_source = beam.Pipeline()
+    ie.current_env().add_user_pipeline(sql_source)
   return query, sql_source
 
 
+@progress_indicated
+def cache_output(output_name: str, output: PValue) -> None:
+  user_pipeline = ie.current_env().user_pipeline(output.pipeline)
+  if user_pipeline:
+    cache_manager = ie.current_env().get_cache_manager(
+        user_pipeline, create_if_absent=True)
+  else:
+    _LOGGER.warning(
+        'Something is wrong with %s. Cannot introspect its data.', output)
+    return
+  key = CacheKey.from_pcoll(output_name, output).to_str()
+
+  class Reify(beam.DoFn):
+    def process(
+        self,
+        e,
+        w=beam.DoFn.WindowParam,
+        p=beam.DoFn.PaneInfoParam,
+        t=beam.DoFn.TimestampParam):
+      yield test_stream.WindowedValueHolder(WindowedValue(e, t, [w], p))
+
+  _ = (
+      output | '{}{}'.format('Reify', key) >> beam.ParDo(Reify())
+      | '{}{}'.format('WriteSqlOutputCache', key) >> cache.WriteCache(
+          cache_manager, key))

Review comment:
       I'm worried that this and the `pipeline_instrument._write_cache` can get 
out of sync with this duplicated code. Can you please refactor this to use a 
shared method with the pipeline_instrument? Maybe have a module-level 
pipeline_instrument method or PTransform called `ReifyToCache`.

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -466,3 +465,25 @@ def record(self, pcolls, max_n, max_duration):
     self._recordings.add(recording)
 
     return recording
+
+  def read(self, pcoll_name: str, pcoll: beam.pvalue.PCollection):

Review comment:
       Awesome, thank you!

##########
File path: sdks/python/apache_beam/runners/interactive/recording_manager.py
##########
@@ -466,3 +465,25 @@ def record(self, pcolls, max_n, max_duration):
     self._recordings.add(recording)
 
     return recording
+
+  def read(self, pcoll_name: str, pcoll: beam.pvalue.PCollection):
+    # type: (str, beam.pvalue.PValue) -> Union[None, ElementStream]
+
+    """Reads an ElementStream of a computed PCollection.
+
+    Returns None if an error occurs. The caller is responsible of validating if
+    the given pcoll_name and pcoll can identify a watched and computed
+    PCollection without ambiguity in the notebook.
+    """
+
+    try:
+      cache_key = CacheKey.from_pcoll(pcoll_name, pcoll).to_str()
+      return ElementStream(
+          pcoll, pcoll_name, cache_key, float('inf'), float('inf'))

Review comment:
       Please add max_n and max_duration as arguments with defaults as 'inf'

##########
File path: sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
##########
@@ -110,15 +126,15 @@ def beam_sql(self, line: str, cell: str) -> Union[None, 
PValue]:
         return
       register_coder_for_schema(pcoll.element_type)
 
-    # TODO(BEAM-10708): implicitly execute the pipeline and write output into
-    # cache.
-    return apply_sql(cell, line, found)
+    output_name, output = apply_sql(cell, line, found)
+    cache_output(output_name, output)

Review comment:
       I can imagine the use case that a user might be modifying the DB in a 
separate application and they want the most recent view instead of a cached 
view. How might this use case be supported with this caching logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to