[
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278202#comment-15278202
]
ASF GitHub Bot commented on FLINK-3701:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1913#discussion_r62684204
--- Diff:
flink-core/src/main/java/org/apache/flink/util/SerializableCacheableValue.java
---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A data structure that enables to keep a value which may be
serialized/deserialized using a custom
+ * Classloader. The encapsulated value is kept locally for further use
after a copy of the class has
+ * been serialized and shipped somewhere else.
+ *
+ * This is useful if we want to keep working with a value which may
require a custom classloader in
+ * one context but is fine with the system/context Classloader in other
cases. The value is cached
+ * as long as possible to prevent unnecessary
serialization/deserialization.
+ */
+public class SerializableCacheableValue<T> implements Serializable {
+
+ /** The current cached value. Lost when serialized. */
+ private transient T value;
+
+ private SerializedValue<T> serializedValue;
+
+ public SerializableCacheableValue(T value) {
+ update(value);
+ }
+
+ /**
+ * Custom serialization methods which always writes the latest value.
+ */
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ // trigger serialization once more to update to the least
recent value
+ serialize();
+ out.defaultWriteObject();
+ }
+
+ /**
+ * Serialization, e.g. before shipping the class
+ */
+ public void serialize() throws IOException {
+ if (value != null) {
+ serializedValue = new SerializedValue<>(value);
+ }
+ }
+
+ /**
+ * Explicit deserialiation using a provided class loader.
+ * @param classLoader The class loader to use
+ */
+ public void deserialize(ClassLoader classLoader) {
+ if (serializedValue != null) {
+ try {
+ value =
serializedValue.deserializeValue(classLoader);
+ } catch (Exception e) {
+ throw new RuntimeException("Attempted to
deserialize serialized data " +
+ " with class loader " + classLoader + "
failed. You probably forgot to" +
+ " deserialize using a custom
Classloader via deserialize(Classloader).", e);
+ }
+ }
+ }
+
+ /**
+ * Gets the current value or deserializes it uses the current
Classloader.
+ * @return the value of type T
+ */
+ public T get() {
+ if (value == null) {
+ deserialize(getClass().getClassLoader());
+ }
+ return value;
+ }
+
+ /**
+ * Updates the current stored value.
+ * @param value The new value of type T
+ */
+ public void update(T value) {
+ Preconditions.checkNotNull(value, "Serializable value must not
be null.");
+ this.value = value;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SerializableCacheableValue<?> that =
(SerializableCacheableValue<?>) o;
+
+ try {
+ // serialize for equality check
+ serialize();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while serializing for
equality check.");
+ }
+
+ return serializedValue != null &&
serializedValue.equals(that.serializedValue);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = value != null ? value.hashCode() : 0;
+ result = 31 * result + serializedValue.hashCode();
--- End diff --
To avoid these kind of errors, I think it helps a lot to add annotations
like `@Nullable` to the fields that can be null. Then IntelliJ should give
warnings about null pointers.
> Cant call execute after first execution
> ---------------------------------------
>
> Key: FLINK-3701
> URL: https://issues.apache.org/jira/browse/FLINK-3701
> Project: Flink
> Issue Type: Bug
> Components: Scala Shell
> Affects Versions: 1.1.0
> Reporter: Nikolaas Steenbergen
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
> at
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
> at
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
> at .<init>(<console>:56)
> at .<clinit>(<console>)
> at .<init>(<console>:7)
> at .<clinit>(<console>)
> at $print(<console>)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
> at
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
> at
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)