Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1913#discussion_r62011352
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/util/SerializableCacheableValue.java 
---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.util;
    +
    +
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +/**
    + * A data structure that enables to keep a value which may be 
serialized/deserialized using a custom
    + * Classloader. The encapsulated value is kept locally for further use 
after a copy of the class has
    + * been serialized and shipped somewhere else.
    + *
    + * This is useful if we want to keep working with a value which may 
require a custom classloader in
    + * one context but is fine with the system/context Classloader in other 
cases. The value is cached
    + * as long as possible to prevent unnecessary 
serialization/deserialization.
    + */
    +public class SerializableCacheableValue<T> implements Serializable {
    +
    +   /** The current cached value. Lost when serialized. */
    +   private transient T value;
    +
    +   private SerializedValue<T> serializedValue;
    +
    +   public SerializableCacheableValue(T value) {
    +           update(value);
    +   }
    +
    +   /**
    +    * Custom serialization methods which always writes the latest value.
    +     */
    +   private void writeObject(ObjectOutputStream out) throws IOException {
    +           // trigger serialization once more to update to the least 
recent value
    +           serialize();
    +           out.defaultWriteObject();
    +   }
    +
    +   /**
    +    * Serialization, e.g. before shipping the class
    +    */
    +   public void serialize() throws IOException {
    +           if (value != null) {
    +                   serializedValue = new SerializedValue<>(value);
    +           }
    +   }
    +
    +   /**
    +    * Explicit deserialiation using a provided class loader.
    +    * @param classLoader The class loader to use
    +    */
    +   public void deserialize(ClassLoader classLoader) {
    +           if (serializedValue != null) {
    +                   try {
    +                           value = 
serializedValue.deserializeValue(classLoader);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Attempted to 
deserialize serialized data " +
    +                                   " with class loader " + classLoader + " 
failed. You probably forgot to" +
    +                                   " deserialize using a custom 
Classloader via deserialize(Classloader).", e);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Gets the current value or deserializes it uses the current 
Classloader.
    +    * @return the value of type T
    +     */
    +   public T get() {
    +           if (value == null) {
    +                   deserialize(getClass().getClassLoader());
    +           }
    +           return value;
    +   }
    +
    +   /**
    +    * Updates the current stored value.
    +    * @param value The new value of type T
    +     */
    +   public void update(T value) {
    +           Preconditions.checkNotNull(value, "Serializable value must not 
be null.");
    +           this.value = value;
    +   }
    +
    +
    +   @Override
    +   public boolean equals(Object o) {
    +           if (this == o) {
    +                   return true;
    +           }
    +           if (o == null || getClass() != o.getClass()) {
    +                   return false;
    +           }
    +
    +           SerializableCacheableValue<?> that = 
(SerializableCacheableValue<?>) o;
    +
    +           try {
    +                   // serialize for equality check
    +                   serialize();
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while serializing for 
equality check.");
    +           }
    +
    +           return serializedValue != null && 
serializedValue.equals(that.serializedValue);
    --- End diff --
    
    What if `that` has not been serialized?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to