[FLINK-1085] [runtime] Combiner forwards oversized records, rather than failing 
on them.

This closes #854


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72718811
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72718811
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72718811

Branch: refs/heads/master
Commit: 7271881163d240ad1106a77036dce981dafb82f3
Parents: 7761ddb
Author: dabaitu <[email protected]>
Authored: Sat Jun 20 15:35:48 2015 -0700
Committer: Stephan Ewen <[email protected]>
Committed: Mon Jul 13 16:29:38 2015 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 53 +++++++++-----------
 .../runtime/operators/CombineTaskTest.java      | 48 ++++++++++++++----
 .../java/org/apache/flink/yarn/UtilsTest.java   |  2 +-
 3 files changed, 63 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 493eb4f..c426295 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
@@ -79,6 +78,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
 
        private Collector<OUT> output;
 
+       private long oversizedRecordCount = 0L;
+
        private volatile boolean running = true;
 
        private boolean objectReuseEnabled = false;
@@ -142,7 +143,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
                this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("GroupReduceCombineDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
+                       LOG.debug("GroupReduceCombineDriver object reuse: {}.", 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED"));
                }
        }
 
@@ -170,7 +171,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
 
                        // write the value again
                        if (!this.sorter.write(value)) {
-                               throw new IOException("Cannot write record to 
fresh sort buffer. Record too large.");
+                               ++oversizedRecordCount;
+                               LOG.debug("Cannot write record to fresh sort 
buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
+                               // simply forward the record
+                               this.output.collect((OUT)value);
                        }
                }
 
@@ -179,39 +183,28 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
        }
 
        private void sortAndCombine() throws Exception {
+               if (sorter.isEmpty()) {
+                       return;
+               }
+
                final InMemorySorter<IN> sorter = this.sorter;
+               this.sortAlgo.sort(sorter);
+               final GroupCombineFunction<IN, OUT> combiner = this.combiner;
+               final Collector<OUT> output = this.output;
 
+               // iterate over key groups
                if (objectReuseEnabled) {
-                       if (!sorter.isEmpty()) {
-                               this.sortAlgo.sort(sorter);
-
-                               final ReusingKeyGroupedIterator<IN> keyIter = 
-                                               new 
ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, 
this.groupingComparator);
-
-                               final GroupCombineFunction<IN, OUT> combiner = 
this.combiner;
-                               final Collector<OUT> output = this.output;
-
-                               // iterate over key groups
-                               while (this.running && keyIter.nextKey()) {
-                                       combiner.combine(keyIter.getValues(), 
output);
-                               }
+                       final ReusingKeyGroupedIterator<IN> keyIter =
+                                       new 
ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, 
this.groupingComparator);
+                       while (this.running && keyIter.nextKey()) {
+                               combiner.combine(keyIter.getValues(), output);
                        }
                } else {
-                       if (!sorter.isEmpty()) {
-                               this.sortAlgo.sort(sorter);
-
-                               final NonReusingKeyGroupedIterator<IN> keyIter 
= 
-                                               new 
NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
-
-                               final GroupCombineFunction<IN, OUT> combiner = 
this.combiner;
-                               final Collector<OUT> output = this.output;
-
-                               // iterate over key groups
-                               while (this.running && keyIter.nextKey()) {
-                                       combiner.combine(keyIter.getValues(), 
output);
-                               }
+                       final NonReusingKeyGroupedIterator<IN> keyIter = 
+                                       new 
NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
+                       while (this.running && keyIter.nextKey()) {
+                               combiner.combine(keyIter.getValues(), output);
                        }
-
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 3d9e991..7772151 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -19,23 +19,21 @@
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.testutils.*;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import 
org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.Generator;
 import org.junit.Test;
 
 public class CombineTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Record, ?>>
@@ -65,7 +63,7 @@ public class CombineTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Reco
                addDriverComparator(this.comparator);
                addDriverComparator(this.comparator);
                setOutput(this.outList);
-               
+
                
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
                getTaskConfig().setRelativeMemoryDriver(combine_frac);
                getTaskConfig().setFilehandlesDriver(2);
@@ -92,7 +90,39 @@ public class CombineTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Reco
                
                this.outList.clear();
        }
-       
+
+       @Test
+       public void testOversizedRecordCombineTask() {
+               int tenMil = 10000000;
+               Generator g = new Generator(561349061987311L, 1, tenMil);
+               //generate 10 records each of size 10MB
+               final TestData.GeneratorIterator gi = new 
TestData.GeneratorIterator(g, 10);
+               List<MutableObjectIterator<Record>> inputs = new 
ArrayList<MutableObjectIterator<Record>>();
+               inputs.add(gi);
+
+               addInput(new UnionIterator<Record>(inputs));
+               addDriverComparator(this.comparator);
+               addDriverComparator(this.comparator);
+               setOutput(this.outList);
+
+               
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+               getTaskConfig().setRelativeMemoryDriver(combine_frac);
+               getTaskConfig().setFilehandlesDriver(2);
+
+               final GroupReduceCombineDriver<Record, Record> testTask = new 
GroupReduceCombineDriver<Record, Record>();
+
+               try {
+                       testDriver(testTask, MockCombiningReduceStub.class);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Invoke method caused exception.");
+               }
+
+               Assert.assertTrue("Resultset size was "+this.outList.size()+". 
Expected was "+10, this.outList.size() == 10);
+
+               this.outList.clear();
+       }
+
        @Test
        public void testFailingCombineTask() {
                int keyCnt = 100;
@@ -119,7 +149,7 @@ public class CombineTaskTest extends 
DriverTestBase<RichGroupReduceFunction<Reco
                        Assert.fail("Test failed due to an exception.");
                }
        }
-       
+
        @Test
        public void testCancelCombineTaskSorting()
        {

http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 25a1413..9ee60a5 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -36,8 +36,8 @@ public class UtilsTest {
        @Test
        public void testUberjarLocator() {
                File dir = YarnTestBase.findFile(".", new 
YarnTestBase.RootDirFilenameFilter());
-               Assert.assertTrue(dir.getName().endsWith(".jar"));
                Assert.assertNotNull(dir);
+               Assert.assertTrue(dir.getName().endsWith(".jar"));
                dir = dir.getParentFile().getParentFile(); // from uberjar to 
lib to root
                Assert.assertTrue(dir.exists());
                Assert.assertTrue(dir.isDirectory());

Reply via email to