This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2ed38c12be7151ab49e9cf2b4e2d8138f1ae4c62 Author: Roman Khachatryan <[email protected]> AuthorDate: Thu May 7 16:48:47 2020 +0200 [FLINK-17547][task] Implement getUnconsumedSegment for spilled buffers --- .../flink/core/memory/MemorySegmentFactory.java | 28 ++++- .../org/apache/flink/util/CloseableIterator.java | 32 ++++++ .../core/memory/MemorySegmentFactoryTest.java | 64 ++++++++++++ .../apache/flink/util/CloseableIteratorTest.java | 82 +++++++++++++++ .../runtime/io/disk/FileBasedBufferIterator.java | 90 ++++++++++++++++ .../network/api/serialization/SpanningWrapper.java | 54 +++++++--- .../api/serialization/SpanningWrapperTest.java | 115 +++++++++++++++++++++ 7 files changed, 448 insertions(+), 17 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index ee301a1..f643bc4 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * A factory for (hybrid) memory segments ({@link HybridMemorySegment}). * @@ -53,6 +55,31 @@ public final class MemorySegmentFactory { } /** + * Copies the given heap memory region and creates a new memory segment wrapping it. + * + * @param bytes The heap memory region. + * @param start starting position, inclusive + * @param end end position, exclusive + * @return A new memory segment that targets a copy of the given heap memory region. + * @throws IllegalArgumentException if start > end or end > bytes.length + */ + public static MemorySegment wrapCopy(byte[] bytes, int start, int end) throws IllegalArgumentException { + checkArgument(end >= start); + checkArgument(end <= bytes.length); + MemorySegment copy = allocateUnpooledSegment(end - start); + copy.put(0, bytes, start, copy.size()); + return copy; + } + + /** + * Wraps the four bytes representing the given number with a {@link MemorySegment}. + * @see ByteBuffer#putInt(int) + */ + public static MemorySegment wrapInt(int value) { + return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array()); + } + + /** * Allocates some unpooled memory and creates a new memory segment that represents * that memory. * @@ -161,5 +188,4 @@ public final class MemorySegmentFactory { public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) { return new HybridMemorySegment(memory, null); } - } diff --git a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java index cc51324..e0c5ec0 100644 --- a/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java +++ b/flink-core/src/main/java/org/apache/flink/util/CloseableIterator.java @@ -24,7 +24,9 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.Deque; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.function.Consumer; import static java.util.Arrays.asList; @@ -80,6 +82,36 @@ public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { }; } + static <T> CloseableIterator<T> flatten(CloseableIterator<T>... iterators) { + return new CloseableIterator<T>() { + private final Queue<CloseableIterator<T>> queue = removeEmptyHead(new LinkedList<>(asList(iterators))); + + private Queue<CloseableIterator<T>> removeEmptyHead(Queue<CloseableIterator<T>> queue) { + while (!queue.isEmpty() && !queue.peek().hasNext()) { + queue.poll(); + } + return queue; + } + + @Override + public boolean hasNext() { + removeEmptyHead(queue); + return !queue.isEmpty(); + } + + @Override + public T next() { + removeEmptyHead(queue); + return queue.peek().next(); + } + + @Override + public void close() throws Exception { + IOUtils.closeAll(iterators); + } + }; + } + @SuppressWarnings("unchecked") static <T> CloseableIterator<T> empty() { return (CloseableIterator<T>) EMPTY_INSTANCE; diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java new file mode 100644 index 0000000..59c1d7e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import org.junit.Test; + +import static java.lang.System.arraycopy; +import static org.junit.Assert.assertArrayEquals; + +/** + * {@link MemorySegmentFactory} test. + */ +public class MemorySegmentFactoryTest { + + @Test + public void testWrapCopyChangingData() { + byte[] data = {1, 2, 3, 4, 5}; + byte[] changingData = new byte[data.length]; + arraycopy(data, 0, changingData, 0, data.length); + MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length); + changingData[0]++; + assertArrayEquals(data, segment.heapMemory); + } + + @Test + public void testWrapPartialCopy() { + byte[] data = {1, 2, 3, 5, 6}; + MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2); + byte[] exp = new byte[segment.size()]; + arraycopy(data, 0, exp, 0, exp.length); + assertArrayEquals(exp, segment.heapMemory); + } + + @Test + public void testWrapCopyEmpty() { + MemorySegmentFactory.wrapCopy(new byte[0], 0, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrapCopyWrongStart() { + MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 10, 3); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrapCopyWrongEnd() { + MemorySegmentFactory.wrapCopy(new byte[]{1, 2, 3}, 0, 10); + } + +} diff --git a/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java new file mode 100644 index 0000000..e2d4d3f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/util/CloseableIteratorTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; + +/** + * {@link CloseableIterator} test. + */ +@SuppressWarnings("unchecked") +public class CloseableIteratorTest { + + private static final String[] ELEMENTS = new String[]{"flink", "blink"}; + + @Test + public void testFlattenEmpty() throws Exception { + List<CloseableIterator<?>> iterators = asList( + CloseableIterator.flatten(), + CloseableIterator.flatten(CloseableIterator.empty()), + CloseableIterator.flatten(CloseableIterator.flatten())); + for (CloseableIterator<?> i : iterators) { + assertFalse(i.hasNext()); + i.close(); + } + } + + @Test + public void testFlattenIteration() { + CloseableIterator<String> iterator = CloseableIterator.flatten( + CloseableIterator.ofElement(ELEMENTS[0], unused -> { + }), + CloseableIterator.ofElement(ELEMENTS[1], unused -> { + }) + ); + + List<String> iterated = new ArrayList<>(); + iterator.forEachRemaining(iterated::add); + assertArrayEquals(ELEMENTS, iterated.toArray()); + } + + @Test(expected = TestException.class) + public void testFlattenErrorHandling() throws Exception { + List<String> closed = new ArrayList<>(); + CloseableIterator<String> iterator = CloseableIterator.flatten( + CloseableIterator.ofElement(ELEMENTS[0], e -> { + closed.add(e); + throw new TestException(); + }), + CloseableIterator.ofElement(ELEMENTS[1], closed::add) + ); + try { + iterator.close(); + } finally { + assertArrayEquals(ELEMENTS, closed.toArray()); + } + } + + private static class TestException extends RuntimeException { + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java new file mode 100644 index 0000000..c7e1cd8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.RefCountedFile; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.util.CloseableIterator; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +import static org.apache.flink.core.memory.MemorySegmentFactory.wrap; +import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER; +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link CloseableIterator} of {@link Buffer buffers} over file content. + */ +@Internal +public class FileBasedBufferIterator implements CloseableIterator<Buffer> { + + private final RefCountedFile file; + private final FileInputStream stream; + private final int bufferSize; + + private int offset; + private int bytesToRead; + + public FileBasedBufferIterator(RefCountedFile file, int bytesToRead, int bufferSize) throws FileNotFoundException { + checkNotNull(file); + checkArgument(bytesToRead >= 0); + checkArgument(bufferSize > 0); + this.stream = new FileInputStream(file.getFile()); + this.file = file; + this.bufferSize = bufferSize; + this.bytesToRead = bytesToRead; + file.retain(); + } + + @Override + public boolean hasNext() { + return bytesToRead > 0; + } + + @Override + public Buffer next() { + byte[] buffer = new byte[bufferSize]; + int bytesRead = read(buffer); + checkState(bytesRead >= 0, "unexpected end of file, file = " + file.getFile() + ", offset=" + offset); + offset += bytesRead; + bytesToRead -= bytesRead; + return new NetworkBuffer(wrap(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead); + } + + private int read(byte[] buffer) { + int limit = Math.min(buffer.length, bytesToRead); + try { + return stream.read(buffer, offset, limit); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + closeAll(stream, file::release); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java index 9cffde3..45d6ad7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java @@ -24,7 +24,10 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.FileBasedBufferIterator; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.StringUtils; @@ -41,15 +44,18 @@ import java.util.Random; import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopy; +import static org.apache.flink.core.memory.MemorySegmentFactory.wrapInt; import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator; import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; +import static org.apache.flink.util.CloseableIterator.empty; import static org.apache.flink.util.FileUtils.writeCompletely; import static org.apache.flink.util.IOUtils.closeAllQuietly; final class SpanningWrapper { - private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes - private static final int FILE_BUFFER_SIZE = 2 * 1024 * 1024; + private static final int DEFAULT_THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes + private static final int DEFAULT_FILE_BUFFER_SIZE = 2 * 1024 * 1024; private final byte[] initialBuffer = new byte[1024]; @@ -61,6 +67,8 @@ final class SpanningWrapper { final ByteBuffer lengthBuffer; + private final int fileBufferSize; + private FileChannel spillingChannel; private byte[] buffer; @@ -79,16 +87,21 @@ final class SpanningWrapper { private DataInputViewStreamWrapper spillFileReader; + private int thresholdForSpilling; + SpanningWrapper(String[] tempDirs) { - this.tempDirs = tempDirs; + this(tempDirs, DEFAULT_THRESHOLD_FOR_SPILLING, DEFAULT_FILE_BUFFER_SIZE); + } + SpanningWrapper(String[] tempDirectories, int threshold, int fileBufferSize) { + this.tempDirs = tempDirectories; this.lengthBuffer = ByteBuffer.allocate(LENGTH_BYTES); this.lengthBuffer.order(ByteOrder.BIG_ENDIAN); - this.recordLength = -1; - this.serializationReadBuffer = new DataInputDeserializer(); this.buffer = initialBuffer; + this.thresholdForSpilling = threshold; + this.fileBufferSize = fileBufferSize; } /** @@ -101,7 +114,7 @@ final class SpanningWrapper { } private boolean isAboveSpillingThreshold() { - return recordLength > THRESHOLD_FOR_SPILLING; + return recordLength > thresholdForSpilling; } void addNextChunkFromMemorySegment(MemorySegment segment, int offset, int numBytes) throws IOException { @@ -137,7 +150,7 @@ final class SpanningWrapper { accumulatedRecordBytes += length; if (hasFullRecord()) { spillingChannel.close(); - spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), FILE_BUFFER_SIZE)); + spillFileReader = new DataInputViewStreamWrapper(new BufferedInputStream(new FileInputStream(spillFile.getFile()), fileBufferSize)); } } @@ -170,22 +183,26 @@ final class SpanningWrapper { CloseableIterator<Buffer> getUnconsumedSegment() throws IOException { if (isReadingLength()) { - return singleBufferIterator(copyLengthBuffer()); + return singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, lengthBuffer.position())); } else if (isAboveSpillingThreshold()) { - throw new UnsupportedOperationException("Unaligned checkpoint currently do not support spilled records."); + return createSpilledDataIterator(); } else if (recordLength == -1) { - return CloseableIterator.empty(); // no remaining partial length or data + return empty(); // no remaining partial length or data } else { return singleBufferIterator(copyDataBuffer()); } } - private MemorySegment copyLengthBuffer() { - int position = lengthBuffer.position(); - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(position); - lengthBuffer.position(0); - segment.put(0, lengthBuffer, position); - return segment; + @SuppressWarnings("unchecked") + private CloseableIterator<Buffer> createSpilledDataIterator() throws IOException { + if (spillingChannel != null && spillingChannel.isOpen()) { + spillingChannel.force(false); + } + return CloseableIterator.flatten( + toSingleBufferIterator(wrapInt(recordLength)), + new FileBasedBufferIterator(spillFile, min(accumulatedRecordBytes, recordLength), fileBufferSize), + leftOverData == null ? empty() : toSingleBufferIterator(wrapCopy(leftOverData.getArray(), leftOverStart, leftOverLimit)) + ); } private MemorySegment copyDataBuffer() throws IOException { @@ -289,4 +306,9 @@ final class SpanningWrapper { return lengthBuffer.position() > 0; } + private static CloseableIterator<Buffer> toSingleBufferIterator(MemorySegment segment) { + NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, segment.size()); + return CloseableIterator.ofElement(buffer, Buffer::recycleBuffer); + } + } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java new file mode 100644 index 0000000..be57e21 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.serialization; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.CloseableIterator; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.apache.flink.core.memory.MemorySegmentFactory.wrap; +import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; +import static org.junit.Assert.assertArrayEquals; + +/** + * {@link SpanningWrapper} test. + */ +public class SpanningWrapperTest { + + private static final Random random = new Random(); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testLargeUnconsumedSegment() throws Exception { + int recordLen = 100; + int firstChunk = (int) (recordLen * .9); + int spillingThreshold = (int) (firstChunk * .9); + + byte[] record1 = recordBytes(recordLen); + byte[] record2 = recordBytes(recordLen * 2); + + SpanningWrapper spanningWrapper = new SpanningWrapper(new String[]{folder.newFolder().getAbsolutePath()}, spillingThreshold, recordLen); + spanningWrapper.transferFrom(wrapNonSpanning(record1, firstChunk), recordLen); + spanningWrapper.addNextChunkFromMemorySegment(wrap(record1), firstChunk, recordLen - firstChunk + LENGTH_BYTES); + spanningWrapper.addNextChunkFromMemorySegment(wrap(record2), 0, record2.length); + + CloseableIterator<Buffer> unconsumedSegment = spanningWrapper.getUnconsumedSegment(); + + spanningWrapper.getInputView().readFully(new byte[recordLen], 0, recordLen); // read out from file + spanningWrapper.transferLeftOverTo(new NonSpanningWrapper()); // clear any leftover + spanningWrapper.transferFrom(wrapNonSpanning(recordBytes(recordLen), recordLen), recordLen); // overwrite with new data + + assertArrayEquals(concat(record1, record2), toByteArray(unconsumedSegment)); + } + + private byte[] recordBytes(int recordLen) { + byte[] inputData = randomBytes(recordLen + LENGTH_BYTES); + for (int i = 0; i < Integer.BYTES; i++) { + inputData[Integer.BYTES - i - 1] = (byte) (recordLen >>> i * 8); + } + return inputData; + } + + private NonSpanningWrapper wrapNonSpanning(byte[] bytes, int len) { + NonSpanningWrapper nonSpanningWrapper = new NonSpanningWrapper(); + MemorySegment segment = wrap(bytes); + nonSpanningWrapper.initializeFromMemorySegment(segment, 0, len); + nonSpanningWrapper.readInt(); // emulate read length performed in getNextRecord to move position + return nonSpanningWrapper; + } + + private byte[] toByteArray(CloseableIterator<Buffer> unconsumed) { + final List<Buffer> buffers = new ArrayList<>(); + try { + unconsumed.forEachRemaining(buffers::add); + byte[] result = new byte[buffers.stream().mapToInt(Buffer::readableBytes).sum()]; + int offset = 0; + for (Buffer buffer : buffers) { + int len = buffer.readableBytes(); + buffer.getNioBuffer(0, len).get(result, offset, len); + offset += len; + } + return result; + } finally { + buffers.forEach(Buffer::recycleBuffer); + } + } + + private byte[] randomBytes(int length) { + byte[] inputData = new byte[length]; + random.nextBytes(inputData); + return inputData; + } + + private byte[] concat(byte[] input1, byte[] input2) { + byte[] expected = new byte[input1.length + input2.length]; + System.arraycopy(input1, 0, expected, 0, input1.length); + System.arraycopy(input2, 0, expected, input1.length, input2.length); + return expected; + } + +}
