[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278202#comment-15278202
 ] 

ASF GitHub Bot commented on FLINK-3701:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1913#discussion_r62684204
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/util/SerializableCacheableValue.java 
---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.util;
    +
    +
    +import java.io.IOException;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +/**
    + * A data structure that enables to keep a value which may be 
serialized/deserialized using a custom
    + * Classloader. The encapsulated value is kept locally for further use 
after a copy of the class has
    + * been serialized and shipped somewhere else.
    + *
    + * This is useful if we want to keep working with a value which may 
require a custom classloader in
    + * one context but is fine with the system/context Classloader in other 
cases. The value is cached
    + * as long as possible to prevent unnecessary 
serialization/deserialization.
    + */
    +public class SerializableCacheableValue<T> implements Serializable {
    +
    +   /** The current cached value. Lost when serialized. */
    +   private transient T value;
    +
    +   private SerializedValue<T> serializedValue;
    +
    +   public SerializableCacheableValue(T value) {
    +           update(value);
    +   }
    +
    +   /**
    +    * Custom serialization methods which always writes the latest value.
    +     */
    +   private void writeObject(ObjectOutputStream out) throws IOException {
    +           // trigger serialization once more to update to the least 
recent value
    +           serialize();
    +           out.defaultWriteObject();
    +   }
    +
    +   /**
    +    * Serialization, e.g. before shipping the class
    +    */
    +   public void serialize() throws IOException {
    +           if (value != null) {
    +                   serializedValue = new SerializedValue<>(value);
    +           }
    +   }
    +
    +   /**
    +    * Explicit deserialiation using a provided class loader.
    +    * @param classLoader The class loader to use
    +    */
    +   public void deserialize(ClassLoader classLoader) {
    +           if (serializedValue != null) {
    +                   try {
    +                           value = 
serializedValue.deserializeValue(classLoader);
    +                   } catch (Exception e) {
    +                           throw new RuntimeException("Attempted to 
deserialize serialized data " +
    +                                   " with class loader " + classLoader + " 
failed. You probably forgot to" +
    +                                   " deserialize using a custom 
Classloader via deserialize(Classloader).", e);
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Gets the current value or deserializes it uses the current 
Classloader.
    +    * @return the value of type T
    +     */
    +   public T get() {
    +           if (value == null) {
    +                   deserialize(getClass().getClassLoader());
    +           }
    +           return value;
    +   }
    +
    +   /**
    +    * Updates the current stored value.
    +    * @param value The new value of type T
    +     */
    +   public void update(T value) {
    +           Preconditions.checkNotNull(value, "Serializable value must not 
be null.");
    +           this.value = value;
    +   }
    +
    +
    +   @Override
    +   public boolean equals(Object o) {
    +           if (this == o) {
    +                   return true;
    +           }
    +           if (o == null || getClass() != o.getClass()) {
    +                   return false;
    +           }
    +
    +           SerializableCacheableValue<?> that = 
(SerializableCacheableValue<?>) o;
    +
    +           try {
    +                   // serialize for equality check
    +                   serialize();
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while serializing for 
equality check.");
    +           }
    +
    +           return serializedValue != null && 
serializedValue.equals(that.serializedValue);
    +   }
    +
    +   @Override
    +   public int hashCode() {
    +           int result = value != null ? value.hashCode() : 0;
    +           result = 31 * result + serializedValue.hashCode();
    --- End diff --
    
    To avoid these kind of errors, I think it helps a lot to add annotations 
like `@Nullable` to the fields that can be null. Then IntelliJ should give 
warnings about null pointers.


> Cant call execute after first execution
> ---------------------------------------
>
>                 Key: FLINK-3701
>                 URL: https://issues.apache.org/jira/browse/FLINK-3701
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala Shell
>    Affects Versions: 1.1.0
>            Reporter: Nikolaas Steenbergen
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>       at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>       at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>       at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>       at .<init>(<console>:56)
>       at .<clinit>(<console>)
>       at .<init>(<console>:7)
>       at .<clinit>(<console>)
>       at $print(<console>)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>       at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>       at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>       at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>       at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>       at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>       at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>       at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>       at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>       at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>       at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>       at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>       at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>       at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>       at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>       at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>       at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>       at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>       at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to