[ 
https://issues.apache.org/jira/browse/BEAM-11146?focusedWorklogId=507074&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-507074
 ]

ASF GitHub Bot logged work on BEAM-11146:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Nov/20 14:17
            Start Date: 03/Nov/20 14:17
    Worklog Time Spent: 10m 
      Work Description: je-ik commented on a change in pull request #13240:
URL: https://github.com/apache/beam/pull/13240#discussion_r516514156



##########
File path: 
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -82,10 +81,14 @@ public T createInstance() {
 
   @Override
   public T copy(T t) {

Review comment:
       Maybe rename to `copyIfNeeded`, as when the flag is on, it doesn't copy 
anything.

##########
File path: 
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -49,20 +50,18 @@
    * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
    * org.apache.beam.sdk.transforms.Reshuffle} translation.
    */
-  @SuppressWarnings("unused")
-  private final @Nullable SerializablePipelineOptions pipelineOptions;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public CoderTypeSerializer(Coder<T> coder) {
-    Preconditions.checkNotNull(coder);
-    this.coder = coder;
-    this.pipelineOptions = null;
-  }
+  private final boolean fasterCopy;

Review comment:
       Small suggestion, can we name this `zeroCopy`, as that is actually what 
it is.

##########
File path: 
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -82,10 +81,14 @@ public T createInstance() {
 
   @Override
   public T copy(T t) {

Review comment:
       Ah, I missed the `@Override` annotation. Understood.

##########
File path: 
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -49,20 +50,18 @@
    * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
    * org.apache.beam.sdk.transforms.Reshuffle} translation.
    */
-  @SuppressWarnings("unused")
-  private final @Nullable SerializablePipelineOptions pipelineOptions;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public CoderTypeSerializer(Coder<T> coder) {
-    Preconditions.checkNotNull(coder);
-    this.coder = coder;
-    this.pipelineOptions = null;
-  }
+  private final boolean fasterCopy;

Review comment:
       Okay, what about `disableValueClone`, I think that `fasterCopy` is a 
little misleading as well. Faster than what? And why it was slower before? :) 
It would be good for the flag to describe direct effect (disable, enable 
something), not a consequence (being faster).

##########
File path: 
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -49,20 +50,18 @@
    * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
    * org.apache.beam.sdk.transforms.Reshuffle} translation.
    */
-  @SuppressWarnings("unused")
-  private final @Nullable SerializablePipelineOptions pipelineOptions;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public CoderTypeSerializer(Coder<T> coder) {
-    Preconditions.checkNotNull(coder);
-    this.coder = coder;
-    this.pipelineOptions = null;
-  }
+  private final boolean fasterCopy;

Review comment:
       To make it clear - although I put my comment to the internal field, I 
was having in mind mostly the flag in PipelineOptions. These should be aligned.

##########
File path: 
runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
##########
@@ -49,20 +50,18 @@
    * org.apache.beam.sdk.io.FileSystems} registration needed for {@link
    * org.apache.beam.sdk.transforms.Reshuffle} translation.
    */
-  @SuppressWarnings("unused")
-  private final @Nullable SerializablePipelineOptions pipelineOptions;
+  private final SerializablePipelineOptions pipelineOptions;
 
-  public CoderTypeSerializer(Coder<T> coder) {
-    Preconditions.checkNotNull(coder);
-    this.coder = coder;
-    this.pipelineOptions = null;
-  }
+  private final boolean fasterCopy;

Review comment:
       Agree that this is not 100% important, but if possible, we should make 
options passed in PipelineOptions the most self-explanatory as possible, 
because these options are user-facing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 507074)
    Time Spent: 1h 10m  (was: 1h)

> Add option to disable copying between Flink runner 
> ---------------------------------------------------
>
>                 Key: BEAM-11146
>                 URL: https://issues.apache.org/jira/browse/BEAM-11146
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Teodor Spæren
>            Assignee: Teodor Spæren
>            Priority: P2
>              Labels: performance
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> In order to implement Flink 
> [TypeSerializer|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java]
>  the runner implements 
> [CoderTypeSerializer|https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84].
>  The way the {{copy}} function is implemented is by first serializing and 
> then deserializing each element. This means that such a deep copy needs to be 
> done between each operator and this can become a bottleneck.
> The reason the {{copy}} functions need to be implemented is that Flink 
> guarantees that elements will be deep copied between each operator. In Beam 
> this is the users responsibility and so this is not strictly neccecarry.
> The aim of this improvement is to introduce an option on the Flink Runner, 
> that eliminates this overhead, by simply returning the value.
> [Here is the mailing list 
> discussion|https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to