This is an automated email from the ASF dual-hosted git repository.
ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7acadb6 [BEAM-10708] Support streaming cache in beam_sql magic
new a871a49 Merge pull request #15446 from KevinGG/ib-ts-coder
7acadb6 is described below
commit 7acadb6334e9bb24c1033e57d904e3522e495f52
Author: KevinGG <[email protected]>
AuthorDate: Wed Sep 1 15:11:51 2021 -0700
[BEAM-10708] Support streaming cache in beam_sql magic
Updated the query source to always mark their element_type so that
pickled Python coders are not introduced to the Java expansion service
for SqlTransforms.
---
sdks/python/apache_beam/runners/interactive/cache_manager.py | 2 --
.../apache_beam/runners/interactive/caching/streaming_cache.py | 2 --
sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py | 6 +++++-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py
b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 886e56e..9ed0b25 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -208,8 +208,6 @@ class FileBasedCacheManager(CacheManager):
def load_pcoder(self, *labels):
saved_pcoder = self._saved_pcoders.get(self._path(*labels), None)
- # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
- # WindowedValueHolder.
if saved_pcoder is None or isinstance(saved_pcoder,
coders.FastPrimitivesCoder):
return self._default_pcoder
diff --git
a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
index 054c9a6..fc8a8aa 100644
--- a/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
+++ b/sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
@@ -390,8 +390,6 @@ class StreamingCache(CacheManager):
def load_pcoder(self, *labels):
saved_pcoder = self._saved_pcoders.get(
os.path.join(self._cache_dir, *labels), None)
- # TODO(BEAM-12506): Get rid of the SafeFastPrimitivesCoder for
- # WindowedValueHolder.
if saved_pcoder is None or isinstance(saved_pcoder,
coders.FastPrimitivesCoder):
return self._default_pcoder
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index cee3d34..1dc42e0 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -227,7 +227,11 @@ def pcolls_from_streaming_cache(
endpoint=test_stream_service.endpoint)
sql_source = {}
for tag, output in output_pcolls.items():
- sql_source[tag_to_name[tag]] = output
+ name = tag_to_name[tag]
+ # Must mark the element_type to avoid introducing pickled Python coder
+ # to the Java expansion service.
+ output.element_type = name_to_pcoll[name].element_type
+ sql_source[name] = output
return sql_source