tillrohrmann commented on a change in pull request #14966:
URL: https://github.com/apache/flink/pull/14966#discussion_r579644550



##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
##########
@@ -31,7 +31,7 @@
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
+ * A factory for (hybrid) memory segments ({@link OffHeapMemorySegment}).

Review comment:
       `A factory for {@link OffHeapMemorySegment}`.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
##########
@@ -24,33 +24,22 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
 
 /**
- * This class represents a piece of memory managed by Flink.
+ * This class represents a piece of off-heap memory managed by Flink.
  *
- * <p>The memory can be on-heap, off-heap direct or off-heap unsafe, this is 
transparently handled
- * by this class.
- *
- * <p>This class specializes byte access and byte copy calls for heap memory, 
while reusing the
- * multi-byte type accesses and cross-segment operations from the 
MemorySegment.
- *
- * <p>This class subsumes the functionality of the {@link
- * org.apache.flink.core.memory.HeapMemorySegment}, but is a bit less 
efficient for operations on
- * individual bytes.
+ * <p>The memory can direct or unsafe, this is transparently handled by this 
class.

Review comment:
       `can be`

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
##########
@@ -51,7 +51,7 @@ MemorySegment createSegment(int size, Object owner) {
     @Test
     public void testHybridHeapSegmentSpecifics() {
         final int bufSize = 411;
-        HybridMemorySegment seg = (HybridMemorySegment) createSegment(bufSize);
+        OffHeapMemorySegment seg = (OffHeapMemorySegment) 
createSegment(bufSize);

Review comment:
       The cast should not be necessary if we change the return type of 
`createSegment`.

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
##########
@@ -37,19 +37,18 @@ public void testHeapSegment() {
 
     @Test
     public void testHybridOnHeapSegment() {
-        testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.wrap(new 
byte[11111]));
+        testBigAndLittleEndianAccessUnaligned(
+                MemorySegmentFactory.wrapHeapSegment(new byte[11111]));
     }
 
     @Test
     public void testHybridOffHeapSegment() {
-        testBigAndLittleEndianAccessUnaligned(
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(11111));
+        
testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.allocateDirectSegment(11111));
     }
 
     @Test
     public void testHybridOffHeapUnsafeSegment() {

Review comment:
       Could be renamed into `testUnsafeOffHeapSegment`

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
##########
@@ -130,7 +130,7 @@ public static MemorySegment 
allocateUnpooledOffHeapMemory(int size) {
      */
     public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object 
owner) {

Review comment:
       Should we rename the factory methods to reflect which type of off heap 
memory they allocate?

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/memory/UnsafeMemorySegment.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a piece of unsafe off-heap memory managed by Flink.
+ *
+ * <p>Note that memory segments should usually not be allocated manually, but 
rather through the
+ * {@link MemorySegmentFactory}.
+ */
+@Internal
+public class UnsafeMemorySegment extends OffHeapMemorySegment {
+    @Nullable private final Runnable cleaner;
+
+    /**
+     * Creates a new memory segment that represents the memory backing the 
given unsafe byte buffer.
+     * Note that the given ByteBuffer must be direct {@link
+     * java.nio.ByteBuffer#allocateDirect(int)}, otherwise this method with 
throw an
+     * IllegalArgumentException.
+     *
+     * <p>The memory segment references the given owner.
+     *
+     * @param buffer The byte buffer whose memory is represented by this 
memory segment.
+     * @param owner The owner references by this memory segment.
+     * @param cleaner The cleaner to be called on free segment.
+     * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not 
direct.
+     */
+    UnsafeMemorySegment(
+            @Nonnull ByteBuffer buffer, @Nullable Object owner, @Nullable 
Runnable cleaner) {

Review comment:
       When is `cleaner` `null`?

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
##########
@@ -26,7 +26,7 @@
 
 import static org.junit.Assert.assertTrue;
 
-/** Tests for the {@link HybridMemorySegment} in off-heap mode using unsafe 
memory. */
+/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using unsafe 
memory. */
 @RunWith(Parameterized.class)
 public class HybridOffHeapUnsafeMemorySegmentTest extends 
MemorySegmentTestBase {

Review comment:
       Will be renamed later, right?

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
##########
@@ -51,7 +51,7 @@ MemorySegment createSegment(int size, Object owner) {
     @Test
     public void testHybridHeapSegmentSpecifics() {

Review comment:
       Name of the test can be updated.

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
##########
@@ -30,7 +30,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/** Tests for the {@link HybridMemorySegment} in off-heap mode using direct 
memory. */
+/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using direct 
memory. */
 @RunWith(Parameterized.class)
 public class HybridOffHeapDirectMemorySegmentTest extends 
MemorySegmentTestBase {

Review comment:
       I guess this test will be renamed in a later commit, right?

##########
File path: 
flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
##########
@@ -37,19 +37,18 @@ public void testHeapSegment() {
 
     @Test
     public void testHybridOnHeapSegment() {
-        testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.wrap(new 
byte[11111]));
+        testBigAndLittleEndianAccessUnaligned(
+                MemorySegmentFactory.wrapHeapSegment(new byte[11111]));
     }
 
     @Test
     public void testHybridOffHeapSegment() {

Review comment:
       could be renamed into `testDirectOffHeapSegment`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to