[hotfix][runtime-tests] Deduplicate CollectingResultPartitionWriters classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b24757e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b24757e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b24757e

Branch: refs/heads/master
Commit: 6b24757efd50e4e6da8db72f885e49fa5a465047
Parents: 433e05c
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Jan 23 17:28:13 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:22 2018 +0100

----------------------------------------------------------------------
 ...AbstractCollectingResultPartitionWriter.java | 68 ++++++++++++++++++++
 .../RecordCollectingResultPartitionWriter.java  | 31 +--------
 ...dOrEventCollectingResultPartitionWriter.java | 32 +--------
 3 files changed, 74 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b24757e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
new file mode 100644
index 0000000..49a211e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link ResultPartitionWriter} that collects output on the List.
+ */
+public abstract class AbstractCollectingResultPartitionWriter implements 
ResultPartitionWriter {
+       private final BufferProvider bufferProvider;
+
+       public AbstractCollectingResultPartitionWriter(BufferProvider 
bufferProvider) {
+               this.bufferProvider = checkNotNull(bufferProvider);
+       }
+
+       @Override
+       public BufferProvider getBufferProvider() {
+               return bufferProvider;
+       }
+
+       @Override
+       public ResultPartitionID getPartitionId() {
+               return new ResultPartitionID();
+       }
+
+       @Override
+       public int getNumberOfSubpartitions() {
+               return 1;
+       }
+
+       @Override
+       public int getNumTargetKeyGroups() {
+               return 1;
+       }
+
+       @Override
+       public void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException {
+               deserializeBuffer(buffer);
+       }
+
+       protected abstract void deserializeBuffer(Buffer buffer) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b24757e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
index 24ccae1..6356f4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
@@ -22,54 +22,29 @@ import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRec
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.types.Record;
 
 import java.io.IOException;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * {@link ResultPartitionWriter} that collects output on the List.
  */
-public class RecordCollectingResultPartitionWriter implements 
ResultPartitionWriter {
+public class RecordCollectingResultPartitionWriter extends 
AbstractCollectingResultPartitionWriter {
        private final List<Record> output;
-       private final BufferProvider bufferProvider;
 
        private final Record record = new Record();
        private final RecordDeserializer<Record> deserializer = new 
AdaptiveSpanningRecordDeserializer<>();
 
        public RecordCollectingResultPartitionWriter(List<Record> output, 
BufferProvider bufferProvider) {
+               super(bufferProvider);
                this.output = checkNotNull(output);
-               this.bufferProvider = checkNotNull(bufferProvider);
        }
 
        @Override
-       public BufferProvider getBufferProvider() {
-               return bufferProvider;
-       }
-
-       @Override
-       public ResultPartitionID getPartitionId() {
-               return new ResultPartitionID();
-       }
-
-       @Override
-       public int getNumberOfSubpartitions() {
-               return 1;
-       }
-
-       @Override
-       public int getNumTargetKeyGroups() {
-               return 1;
-       }
-
-       @Override
-       public void writeBuffer(Buffer buffer, int targetChannel) throws 
IOException {
-               checkState(targetChannel < getNumberOfSubpartitions());
-
+       protected void deserializeBuffer(Buffer buffer) throws IOException {
                deserializer.setNextBuffer(buffer);
 
                while (deserializer.hasUnfinishedData()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b24757e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
index c9ec6df..d1b4570 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 
@@ -33,14 +32,12 @@ import java.io.IOException;
 import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * {@link ResultPartitionWriter} that collects records or events on the List.
  */
-public class RecordOrEventCollectingResultPartitionWriter<T> implements 
ResultPartitionWriter {
+public class RecordOrEventCollectingResultPartitionWriter<T> extends 
AbstractCollectingResultPartitionWriter {
        private final Collection<Object> output;
-       private final BufferProvider bufferProvider;
        private final NonReusingDeserializationDelegate<T> delegate;
        private final RecordDeserializer<DeserializationDelegate<T>> 
deserializer = new AdaptiveSpanningRecordDeserializer<>();
 
@@ -48,36 +45,13 @@ public class 
RecordOrEventCollectingResultPartitionWriter<T> implements ResultPa
                        Collection<Object> output,
                        BufferProvider bufferProvider,
                        TypeSerializer<T> serializer) {
-
+               super(bufferProvider);
                this.output = checkNotNull(output);
-               this.bufferProvider = checkNotNull(bufferProvider);
                this.delegate = new 
NonReusingDeserializationDelegate<>(checkNotNull(serializer));
        }
 
        @Override
-       public BufferProvider getBufferProvider() {
-               return bufferProvider;
-       }
-
-       @Override
-       public ResultPartitionID getPartitionId() {
-               return new ResultPartitionID();
-       }
-
-       @Override
-       public int getNumberOfSubpartitions() {
-               return 1;
-       }
-
-       @Override
-       public int getNumTargetKeyGroups() {
-               return 1;
-       }
-
-       @Override
-       public void writeBuffer(Buffer buffer, int targetChannel) throws 
IOException {
-               checkState(targetChannel < getNumberOfSubpartitions());
-
+       protected void deserializeBuffer(Buffer buffer) throws IOException {
                if (buffer.isBuffer()) {
                        deserializer.setNextBuffer(buffer);
 

Reply via email to