tillrohrmann commented on a change in pull request #14966:
URL: https://github.com/apache/flink/pull/14966#discussion_r579644550
##########
File path:
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
##########
@@ -31,7 +31,7 @@
import static org.apache.flink.util.Preconditions.checkArgument;
/**
- * A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
+ * A factory for (hybrid) memory segments ({@link OffHeapMemorySegment}).
Review comment:
`A factory for {@link OffHeapMemorySegment}`.
##########
File path:
flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
##########
@@ -24,33 +24,22 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
/**
- * This class represents a piece of memory managed by Flink.
+ * This class represents a piece of off-heap memory managed by Flink.
*
- * <p>The memory can be on-heap, off-heap direct or off-heap unsafe, this is
transparently handled
- * by this class.
- *
- * <p>This class specializes byte access and byte copy calls for heap memory,
while reusing the
- * multi-byte type accesses and cross-segment operations from the
MemorySegment.
- *
- * <p>This class subsumes the functionality of the {@link
- * org.apache.flink.core.memory.HeapMemorySegment}, but is a bit less
efficient for operations on
- * individual bytes.
+ * <p>The memory can direct or unsafe, this is transparently handled by this
class.
Review comment:
`can be`
##########
File path:
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
##########
@@ -51,7 +51,7 @@ MemorySegment createSegment(int size, Object owner) {
@Test
public void testHybridHeapSegmentSpecifics() {
final int bufSize = 411;
- HybridMemorySegment seg = (HybridMemorySegment) createSegment(bufSize);
+ OffHeapMemorySegment seg = (OffHeapMemorySegment)
createSegment(bufSize);
Review comment:
The cast should not be necessary if we change the return type of
`createSegment`.
##########
File path:
flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
##########
@@ -37,19 +37,18 @@ public void testHeapSegment() {
@Test
public void testHybridOnHeapSegment() {
- testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.wrap(new
byte[11111]));
+ testBigAndLittleEndianAccessUnaligned(
+ MemorySegmentFactory.wrapHeapSegment(new byte[11111]));
}
@Test
public void testHybridOffHeapSegment() {
- testBigAndLittleEndianAccessUnaligned(
- MemorySegmentFactory.allocateUnpooledOffHeapMemory(11111));
+
testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.allocateDirectSegment(11111));
}
@Test
public void testHybridOffHeapUnsafeSegment() {
Review comment:
Could be renamed into `testUnsafeOffHeapSegment`
##########
File path:
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
##########
@@ -130,7 +130,7 @@ public static MemorySegment
allocateUnpooledOffHeapMemory(int size) {
*/
public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object
owner) {
Review comment:
Should we rename the factory methods to reflect which type of off heap
memory they allocate?
##########
File path:
flink-core/src/main/java/org/apache/flink/core/memory/UnsafeMemorySegment.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a piece of unsafe off-heap memory managed by Flink.
+ *
+ * <p>Note that memory segments should usually not be allocated manually, but
rather through the
+ * {@link MemorySegmentFactory}.
+ */
+@Internal
+public class UnsafeMemorySegment extends OffHeapMemorySegment {
+ @Nullable private final Runnable cleaner;
+
+ /**
+ * Creates a new memory segment that represents the memory backing the
given unsafe byte buffer.
+ * Note that the given ByteBuffer must be direct {@link
+ * java.nio.ByteBuffer#allocateDirect(int)}, otherwise this method with
throw an
+ * IllegalArgumentException.
+ *
+ * <p>The memory segment references the given owner.
+ *
+ * @param buffer The byte buffer whose memory is represented by this
memory segment.
+ * @param owner The owner references by this memory segment.
+ * @param cleaner The cleaner to be called on free segment.
+ * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not
direct.
+ */
+ UnsafeMemorySegment(
+ @Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable
Runnable cleaner) {
Review comment:
When is `cleaner` `null`?
##########
File path:
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
##########
@@ -26,7 +26,7 @@
import static org.junit.Assert.assertTrue;
-/** Tests for the {@link HybridMemorySegment} in off-heap mode using unsafe
memory. */
+/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using unsafe
memory. */
@RunWith(Parameterized.class)
public class HybridOffHeapUnsafeMemorySegmentTest extends
MemorySegmentTestBase {
Review comment:
Will be renamed later, right?
##########
File path:
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
##########
@@ -51,7 +51,7 @@ MemorySegment createSegment(int size, Object owner) {
@Test
public void testHybridHeapSegmentSpecifics() {
Review comment:
Name of the test can be updated.
##########
File path:
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
##########
@@ -30,7 +30,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-/** Tests for the {@link HybridMemorySegment} in off-heap mode using direct
memory. */
+/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using direct
memory. */
@RunWith(Parameterized.class)
public class HybridOffHeapDirectMemorySegmentTest extends
MemorySegmentTestBase {
Review comment:
I guess this test will be renamed in a later commit, right?
##########
File path:
flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
##########
@@ -37,19 +37,18 @@ public void testHeapSegment() {
@Test
public void testHybridOnHeapSegment() {
- testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.wrap(new
byte[11111]));
+ testBigAndLittleEndianAccessUnaligned(
+ MemorySegmentFactory.wrapHeapSegment(new byte[11111]));
}
@Test
public void testHybridOffHeapSegment() {
Review comment:
could be renamed into `testDirectOffHeapSegment`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]