[
https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535535#comment-15535535
]
Flavio Pompermaier commented on FLINK-4719:
-------------------------------------------
Using the Flink 1.1.1 code my job fails frequently, instead using the following
code for the KryoSerializer decrease a lot the frequency of such Exception.
I hope this could help in solving the problem:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A type serializer that serializes its type using the Kryo serialization
* framework (https://github.com/EsotericSoftware/kryo).
*
* This serializer is intended as a fallback serializer for the cases that are
* not covered by the basic types, tuples, and POJOs.
*
* @param <T> The type to be serialized.
*/
public class KryoSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 3L;
private static final Logger LOG =
LoggerFactory.getLogger(KryoSerializer.class);
//
------------------------------------------------------------------------
private final LinkedHashMap<Class<?>,
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>,
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;
private final Class<T> type;
//
------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or
deserialization.
private transient Kryo kryo;
private transient T copyInstance;
//
------------------------------------------------------------------------
public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = checkNotNull(type);
this.defaultSerializers =
executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses =
executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers =
executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses =
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}
/**
* Copy-constructor that does not copy transient fields. They will be
initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers =
toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses =
toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;
type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be
null.");
}
}
//
------------------------------------------------------------------------
@Override
public boolean isImmutableType() {
return false;
}
@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}
@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) ||
Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}
@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through
serialization:
ByteArrayOutputStream baout = new
ByteArrayOutputStream();
Output output = new Output(baout);
kryo.writeObject(output, from);
output.close();
ByteArrayInputStream bain = new
ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);
T ret = (T)kryo.readObject(input, from.getClass());
input.close();
return ret;
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(T record, DataOutputView target) throws
IOException {
checkKryoInitialized();
DataOutputViewStream outputStream = new
DataOutputViewStream(target);
Output output = new Output(outputStream);
try {
kryo.writeClassAndObject(output, record);
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
try{
output.close();
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof
EOFException) {
throw (EOFException)
cause;
} else {
throw ke;
}
}
}
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new
DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);
try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
try{
input.close();
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
}
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws
IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}
T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
//
--------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
// we cannot include the Serializers here because they
don't implement the equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
&&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}
//
--------------------------------------------------------------------------------------------
/**
* Returns the Chill Kryo Serializer which is implictly added to the
classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {
try {
// check if ScalaKryoInstantiator is in class path
(coming from Twitter's Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz =
Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator =
chillInstantiatorClazz.newInstance();
// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");
return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException |
NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {
LOG.warn("Falling back to default Kryo serializer
because Chill serializer couldn't be found.", e);
Kryo.DefaultInstantiatorStrategy initStrategy = new
Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new
StdInstantiatorStrategy());
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);
return kryo;
}
}
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();
// Enable reference tracking.
kryo.setReferences(true);
// Throwable and all subclasses should be serialized
via java serialization
kryo.addDefaultSerializer(Throwable.class, new
JavaSerializer());
// Add default serializers first, so that they type
registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>,
ExecutionConfig.SerializableSerializer<?>> entry:
defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(),
entry.getValue().getSerializer());
}
for (Map.Entry<Class<?>, Class<? extends
Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(),
entry.getValue());
}
// register the type of our class
kryo.register(type);
// register given types. we do this first so that any
registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}
// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends
Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass
= e.getValue();
Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
kryo.register(typeClass, serializer);
}
// register given serializers
for (Map.Entry<Class<?>,
ExecutionConfig.SerializableSerializer<?>> e :
registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(),
e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on
demand.
kryo.register(GenericData.Array.class, new
SpecificInstanceCollectionSerializerForArrayList());
kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}
//
--------------------------------------------------------------------------------------------
// For testing
//
--------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}
{code}
> KryoSerializer random exception
> -------------------------------
>
> Key: FLINK-4719
> URL: https://issues.apache.org/jira/browse/FLINK-4719
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.1.1
> Reporter: Flavio Pompermaier
> Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when
> using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here
> can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at
> main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted
> input: Thread 'SortMerger spilling thread' terminated due to an exception:
> Unable to find class: java.ttil.HashSet
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: Unable to
> find class: java.ttil.HashSet
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: Unable to find class: java.ttil.HashSet
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> java.ttil.HashSet
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> it.okkam.flink.test.model.pojo.VdhicleEvent
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> ... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
> at
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
> at
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
> at
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
> at
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
> at
> org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
> at
> org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
> at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)