[ 
https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173687
 ]

ASF GitHub Bot logged work on BEAM-5775:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Dec/18 16:42
            Start Date: 10/Dec/18 16:42
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #6714: 
[BEAM-5775] Spark: implement a custom class to lazily encode values for 
persistence.
URL: https://github.com/apache/beam/pull/6714#discussion_r233476897
 
 

 ##########
 File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java
 ##########
 @@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.VarInt;
+
+/**
+ * A holder object that lets you serialize an element with a Coder with 
minimal wasted space.
+ * Supports both Kryo and Java serialization.
+ *
+ * <p>There are two different representations: a deserialized representation 
and a serialized
+ * representation.
+ *
+ * <p>The deserialized representation stores a Coder and the value. To 
serialize the value, we write
+ * a length-prefixed encoding of value, but do NOT write the Coder used.
+ *
+ * <p>The serialized representation just reads a byte array - the value is not 
deserialized fully.
+ * In order to get at the deserialized value, the caller must pass the Coder 
used to create this
+ * instance via get(Coder). This reverts the representation back to the 
deserialized representation.
+ *
+ * @param <T> element type
+ */
+public class ValueAndCoderKryoSerializable<T> implements KryoSerializable, 
Externalizable {
+  private T value;
+  // Re-use a field to save space in-memory. This is either a byte[] or a 
Coder, depending on
+  // which representation we are in.
+  private Object coderOrBytes;
+
+  ValueAndCoderKryoSerializable(T value, Coder<T> currentCoder) {
+    this.value = value;
+    this.coderOrBytes = currentCoder;
+  }
+
+  @SuppressWarnings("unused") // for serialization
+  public ValueAndCoderKryoSerializable() {}
+
+  public T get(Coder<T> coder) throws IOException {
+    if (!(coderOrBytes instanceof Coder)) {
+      value =
+          coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), 
Coder.Context.OUTER);
+      this.coderOrBytes = coder;
+    }
+
+    return value;
+  }
+
+  private void writeCommon(OutputStream out) throws IOException {
+    if (!(coderOrBytes instanceof Coder)) {
+      byte[] bytes = (byte[]) coderOrBytes;
+      VarInt.encode(bytes.length, out);
+      out.write(bytes);
+    } else {
+      int bufferSize = 1024;
+      // TODO: use isRegisterByteSizeObserverCheap
 
 Review comment:
   TODO ? can this be done in this PR?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 173687)
    Time Spent: 1h  (was: 50m)

> Make the spark runner not serialize data unless spark is spilling to disk
> -------------------------------------------------------------------------
>
>                 Key: BEAM-5775
>                 URL: https://issues.apache.org/jira/browse/BEAM-5775
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Mike Kaplinskiy
>            Assignee: Mike Kaplinskiy
>            Priority: Minor
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. 
> This lets Spark keep the data in memory avoiding the serialization round 
> trip. Unfortunately the logic is fairly coarse - as soon as you switch to 
> MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen 
> to keep the data in memory, incurring the serialization overhead.
>  
> Ideally Beam would serialize the data lazily - as Spark chooses to spill to 
> disk. This would be a change in behavior when using beam, but luckily Spark 
> has a solution for folks that want data serialized in memory - 
> MEMORY_AND_DISK_SER will keep the data serialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to