This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76b299359fcd6463d1e7c46fc64bfd42f787daef
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Jun 18 15:57:02 2019 +0200

    [FLINK-12777][network] Introduce LinkedBufferStorage class
---
 .../streaming/runtime/io/LinkedBufferStorage.java  |  92 ++++++++++++
 .../runtime/io/BufferStorageTestBase.java          |   4 +
 .../runtime/io/LinkedBufferStorageTest.java        | 165 +++++++++++++++++++++
 3 files changed, 261 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
new file mode 100644
index 0000000..aac2ba6
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Implementation of {@link BufferStorage} that links two {@link 
BufferStorage} together.
+ * Each of the linked {@link BufferStorage} will store buffers independently, 
but they will be
+ * linked together for {@link #rollOver()} - if one is rolled over, other will 
do that as well.
+ *
+ * <p>Note that only {@code mainStorage} is closed when {@link 
LinkedBufferStorage} instance is closed.
+ */
+public class LinkedBufferStorage implements BufferStorage {
+
+       private final BufferStorage mainStorage;
+
+       private final BufferStorage linkedStorage;
+
+       private long maxBufferedBytes;
+
+       public LinkedBufferStorage(BufferStorage mainStorage, BufferStorage 
linkedStorage, long maxBufferedBytes) {
+               this.mainStorage = mainStorage;
+               this.linkedStorage = linkedStorage;
+               this.maxBufferedBytes = maxBufferedBytes;
+       }
+
+       @Override
+       public void add(BufferOrEvent boe) throws IOException {
+               mainStorage.add(boe);
+       }
+
+       @Override
+       public boolean isFull() {
+               return maxBufferedBytes > 0 && (getRolledBytes() + 
getPendingBytes()) > maxBufferedBytes;
+       }
+
+       @Override
+       public void rollOver() throws IOException {
+               mainStorage.rollOver();
+               linkedStorage.rollOver();
+       }
+
+       @Override
+       public long getPendingBytes() {
+               return mainStorage.getPendingBytes() + 
linkedStorage.getPendingBytes();
+       }
+
+       @Override
+       public long getRolledBytes() {
+               return mainStorage.getRolledBytes() + 
linkedStorage.getRolledBytes();
+       }
+
+       @Override
+       public boolean isEmpty() {
+               return mainStorage.isEmpty();
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNext() throws IOException {
+               return mainStorage.pollNext();
+       }
+
+       @Override
+       public long getMaxBufferedBytes() {
+               return maxBufferedBytes;
+       }
+
+       @Override
+       public void close() throws IOException {
+               mainStorage.close();
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index b23d3e9..1e219a5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -244,6 +244,10 @@ public abstract class BufferStorageTestBase {
                return new BufferOrEvent(evt, channelIndex);
        }
 
+       public static BufferOrEvent generateRandomBuffer(int size) {
+               return generateRandomBuffer(size, 0);
+       }
+
        public static BufferOrEvent generateRandomBuffer(int size, int 
channelIndex) {
                MemorySegment seg = 
MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
                for (int i = 0; i < size; i++) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
new file mode 100644
index 0000000..1edae8d
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorageTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Optional;
+
+import static junit.framework.TestCase.assertFalse;
+import static 
org.apache.flink.streaming.runtime.io.BufferStorageTestBase.generateRandomBuffer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link LinkedBufferStorage}.
+ */
+public class LinkedBufferStorageTest {
+       private static final int PAGE_SIZE = 100;
+
+       private CachedBufferStorage mainStorage;
+
+       private CachedBufferStorage linkedStorage;
+
+       private LinkedBufferStorage bufferStorage;
+
+       @Before
+       public void setUp() {
+               mainStorage = new CachedBufferStorage(PAGE_SIZE);
+               linkedStorage = new CachedBufferStorage(PAGE_SIZE);
+               bufferStorage = new LinkedBufferStorage(
+                       mainStorage,
+                       linkedStorage,
+                       700);
+       }
+
+       @After
+       public void tearDown() throws IOException {
+               bufferStorage.close();
+               mainStorage.close();
+               linkedStorage.close();
+       }
+
+       @Test
+       public void testBasicUsage() throws IOException {
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE + 0));
+               assertEquals(PAGE_SIZE, bufferStorage.getPendingBytes());
+               assertTrue(bufferStorage.isEmpty());
+
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 1));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 2));
+
+               assertTrue(bufferStorage.isEmpty());
+               assertEquals(mainStorage.getPendingBytes() + 
linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+               assertEquals(mainStorage.getRolledBytes() + 
linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+               assertTrue(bufferStorage.isEmpty());
+               assertTrue(linkedStorage.isEmpty());
+
+               bufferStorage.rollOver();
+
+               assertFalse(bufferStorage.isEmpty());
+               assertFalse(linkedStorage.isEmpty());
+
+               assertEquals(mainStorage.getPendingBytes() + 
linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+               assertEquals(mainStorage.getRolledBytes() + 
linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE + 3));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE + 4));
+
+               assertEquals(mainStorage.getPendingBytes() + 
linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+               assertEquals(mainStorage.getRolledBytes() + 
linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+               bufferStorage.rollOver();
+
+               assertEquals(mainStorage.getPendingBytes() + 
linkedStorage.getPendingBytes(), bufferStorage.getPendingBytes());
+               assertEquals(mainStorage.getRolledBytes() + 
linkedStorage.getRolledBytes(), bufferStorage.getRolledBytes());
+
+               ArrayList<Integer> bufferSizes = drain(bufferStorage);
+
+               assertEquals(PAGE_SIZE + 4, (long) bufferSizes.get(0));
+               assertEquals(PAGE_SIZE + 1, (long) bufferSizes.get(1));
+               assertEquals(PAGE_SIZE + 2, (long) bufferSizes.get(2));
+
+               bufferSizes = drain(linkedStorage);
+
+               assertEquals(PAGE_SIZE + 3, (long) bufferSizes.get(0));
+               assertEquals(PAGE_SIZE + 0, (long) bufferSizes.get(1));
+
+               assertEquals(0, bufferStorage.getRolledBytes());
+               assertEquals(0, bufferStorage.getPendingBytes());
+       }
+
+       @Test
+       public void testPendingIsFull() throws IOException {
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+               assertFalse(bufferStorage.isFull());
+
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+               assertTrue(bufferStorage.isFull());
+       }
+
+       /**
+        * This test is broken because of FLINK-12912.
+        * https://issues.apache.org/jira/browse/FLINK-12912
+        */
+       //@Test
+       public void testRolledIsFull() throws IOException {
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.rollOver();
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.rollOver();
+               linkedStorage.add(generateRandomBuffer(PAGE_SIZE));
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+               assertFalse(bufferStorage.isFull());
+
+               bufferStorage.add(generateRandomBuffer(PAGE_SIZE));
+
+               assertTrue(bufferStorage.isFull());
+       }
+
+       private ArrayList<Integer> drain(BufferStorage bufferStorage) throws 
IOException {
+               ArrayList<Integer> result = new ArrayList<>();
+               while (!bufferStorage.isEmpty()) {
+                       Optional<BufferOrEvent> bufferOrEvent = 
bufferStorage.pollNext();
+                       if (bufferOrEvent.isPresent()) {
+                               result.add(bufferOrEvent.get().getSize());
+                       }
+               }
+               return result;
+       }
+}

Reply via email to