This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c627e472bd7 Merge pull request #21928 from [Fixes #21927] Compress 
(Un)BoundedSourceAsSdfWrapper element and restriction coders
c627e472bd7 is described below

commit c627e472bd727936ff15da7bbbd60e6d48c13680
Author: Luke Cwik <[email protected]>
AuthorDate: Fri Jun 17 16:15:27 2022 -0700

    Merge pull request #21928 from [Fixes #21927] Compress 
(Un)BoundedSourceAsSdfWrapper element and restriction coders
    
    * [Fixes #21927] Compress BoundedSourceAsSdfWrapper element and restriction 
coders
    
    A typical BoundedSource may be split into many BoundedSource instances 
during initial splitting. Doing a simple test of the BigtableSource shows that 
encoding 10 instances after splitting took on average 102660 bytes while 
compressing each instance separately after encoding took 1639 bytes for a >60x 
improvement.
    
    * Also handle unbounded sources.
---
 CHANGES.md                                               |  1 +
 .../core/src/main/java/org/apache/beam/sdk/io/Read.java  | 16 ++++++++++------
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 9cb2830aa6a..97fbd3f5874 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -75,6 +75,7 @@
 
 * The Go Sdk now requires a minimum version of 1.18 in order to support 
generics ([BEAM-14347](https://issues.apache.org/jira/browse/BEAM-14347)).
 * synthetic.SourceConfig field types have changed to int64 from int for better 
compatibility with Flink's use of Logical types in Schemas (Go) 
([BEAM-14173](https://issues.apache.org/jira/browse/BEAM-14173))
+* Default coder updated to compress sources used with 
`BoundedSourceAsSDFWrapperFn` and `UnboundedSourceAsSDFWrapper`.
 
 ## Deprecations
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 26770f49144..6fe0b6df05b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.SnappyCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import 
org.apache.beam.sdk.io.UnboundedSource.CheckpointMark.NoopCheckpointMark;
@@ -148,7 +149,7 @@ public class Read {
           .getPipeline()
           .apply(Impulse.create())
           .apply(ParDo.of(new OutputSingleSource<>(source)))
-          .setCoder(SerializableCoder.of(new 
TypeDescriptor<BoundedSource<T>>() {}))
+          .setCoder(SnappyCoder.of(SerializableCoder.of(new 
TypeDescriptor<BoundedSource<T>>() {})))
           .apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
           .setCoder(source.getOutputCoder())
           
.setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
@@ -219,7 +220,9 @@ public class Read {
               .apply(Impulse.create())
               .apply(ParDo.of(new OutputSingleSource<>(source)))
               .setCoder(
-                  SerializableCoder.of(new TypeDescriptor<UnboundedSource<T, 
CheckpointMark>>() {}))
+                  SnappyCoder.of(
+                      SerializableCoder.of(
+                          new TypeDescriptor<UnboundedSource<T, 
CheckpointMark>>() {})))
               .apply(ParDo.of(createUnboundedSdfWrapper()))
               .setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder()));
 
@@ -317,7 +320,7 @@ public class Read {
 
     @GetRestrictionCoder
     public Coder<BoundedSourceT> restrictionCoder() {
-      return SerializableCoder.of(new TypeDescriptor<BoundedSourceT>() {});
+      return SnappyCoder.of(SerializableCoder.of(new 
TypeDescriptor<BoundedSourceT>() {}));
     }
 
     /**
@@ -603,9 +606,10 @@ public class Read {
 
     @GetRestrictionCoder
     public Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> 
restrictionCoder() {
-      return new UnboundedSourceRestrictionCoder<>(
-          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, 
CheckpointT>>() {}),
-          NullableCoder.of(checkpointCoder));
+      return SnappyCoder.of(
+          new UnboundedSourceRestrictionCoder<>(
+              SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, 
CheckpointT>>() {}),
+              NullableCoder.of(checkpointCoder)));
     }
 
     /**

Reply via email to