This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new fa4a6991 [Flink-30892] Clean codes for memory and tests
fa4a6991 is described below
commit fa4a69913987416d7fd6b3ebe21d0eb39945fff2
Author: Feng Wang <[email protected]>
AuthorDate: Fri Feb 3 17:58:13 2023 +0800
[Flink-30892] Clean codes for memory and tests
This closes #498
---
.../flink/table/store/memory/MemorySegment.java | 8 ----
.../flink/table/store/memory/MemoryUtils.java | 52 ----------------------
...est.java => HadoopLocalFileIOBehaviorTest.java} | 7 +--
.../table/store/memory/MemorySegmentTestBase.java | 17 +++----
4 files changed, 11 insertions(+), 73 deletions(-)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java
index b0edac65..b254a07c 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemorySegment.java
@@ -102,14 +102,6 @@ public final class MemorySegment {
}
}
- public long getAddress() {
- if (heapMemory == null) {
- return address;
- } else {
- throw new IllegalStateException("Memory segment does not represent
off heap memory");
- }
- }
-
public ByteBuffer wrap(int offset, int length) {
return wrapInternal(offset, length);
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java
index 955f2b5f..9ff236ae 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/memory/MemoryUtils.java
@@ -33,10 +33,6 @@ public class MemoryUtils {
private static final long BUFFER_ADDRESS_FIELD_OFFSET =
getClassFieldOffset(Buffer.class, "address");
- private static final long BUFFER_CAPACITY_FIELD_OFFSET =
- getClassFieldOffset(Buffer.class, "capacity");
- private static final Class<?> DIRECT_BYTE_BUFFER_CLASS =
- getClassByName("java.nio.DirectByteBuffer");
@SuppressWarnings("restriction")
private static sun.misc.Unsafe getUnsafe() {
@@ -94,54 +90,6 @@ public class MemoryUtils {
}
}
- /**
- * Allocates unsafe native memory.
- *
- * @param size size of the unsafe memory to allocate.
- * @return address of the allocated unsafe memory
- */
- static long allocateUnsafe(long size) {
- return UNSAFE.allocateMemory(Math.max(1L, size));
- }
-
- /**
- * Creates a cleaner to release the unsafe memory.
- *
- * @param address address of the unsafe memory to release
- * @param customCleanup A custom action to clean up GC
- * @return action to run to release the unsafe memory manually
- */
- static Runnable createMemoryCleaner(long address, Runnable customCleanup) {
- return () -> {
- releaseUnsafe(address);
- customCleanup.run();
- };
- }
-
- private static void releaseUnsafe(long address) {
- UNSAFE.freeMemory(address);
- }
-
- /**
- * Wraps the unsafe native memory with a {@link ByteBuffer}.
- *
- * @param address address of the unsafe memory to wrap
- * @param size size of the unsafe memory to wrap
- * @return a {@link ByteBuffer} which is a view of the given unsafe memory
- */
- static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {
- //noinspection OverlyBroadCatchBlock
- try {
- ByteBuffer buffer = (ByteBuffer)
UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);
- UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
- UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
- buffer.clear();
- return buffer;
- } catch (Throwable t) {
- throw new Error("Failed to wrap unsafe off-heap memory with
ByteBuffer", t);
- }
- }
-
/**
* Get native memory address wrapped by the given {@link ByteBuffer}.
*
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileSystemBehaviorTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileIOBehaviorTest.java
similarity index 91%
rename from
flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileSystemBehaviorTest.java
rename to
flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileIOBehaviorTest.java
index 6a24627a..8b799be2 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileSystemBehaviorTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/fs/HadoopLocalFileIOBehaviorTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.fs;
-import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.table.store.fs.hadoop.HadoopFileIO;
import org.apache.hadoop.conf.Configuration;
@@ -26,17 +25,19 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.util.VersionInfo;
import org.junit.jupiter.api.io.TempDir;
+import java.net.URI;
+
import static org.assertj.core.api.Assumptions.assumeThat;
/** Behavior tests for Hadoop Local. */
-class HadoopLocalFileSystemBehaviorTest extends FileIOBehaviorTestBase {
+class HadoopLocalFileIOBehaviorTest extends FileIOBehaviorTestBase {
@TempDir private java.nio.file.Path tmp;
@Override
protected FileIO getFileSystem() throws Exception {
org.apache.hadoop.fs.FileSystem fs = new RawLocalFileSystem();
- fs.initialize(LocalFileSystem.getLocalFsURI(), new Configuration());
+ fs.initialize(URI.create("file:///"), new Configuration());
HadoopFileIO fileIO = new HadoopFileIO();
fileIO.setFileSystem(fs);
return fileIO;
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java
index 49e7bc33..40b382bf 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/memory/MemorySegmentTestBase.java
@@ -35,13 +35,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -973,13 +970,13 @@ public abstract class MemorySegmentTestBase {
seg1.put(0, bytes1);
seg2.put(0, bytes2);
- assertThat(seg1.compare(seg2, 0, 0, 3, 4), lessThan(0));
- assertThat(seg1.compare(seg2, 0, 0, 3, 3), equalTo(0));
- assertThat(seg1.compare(seg2, 0, 0, 3, 2), greaterThan(0));
+ assertThat(seg1.compare(seg2, 0, 0, 3, 4)).isLessThan(0);
+ assertThat(seg1.compare(seg2, 0, 0, 3, 3)).isEqualTo(0);
+ assertThat(seg1.compare(seg2, 0, 0, 3, 2)).isGreaterThan(0);
// test non-zero offset
- assertThat(seg1.compare(seg2, 1, 1, 2, 3), lessThan(0));
- assertThat(seg1.compare(seg2, 1, 1, 2, 2), equalTo(0));
- assertThat(seg1.compare(seg2, 1, 1, 2, 1), greaterThan(0));
+ assertThat(seg1.compare(seg2, 1, 1, 2, 3)).isLessThan(0);
+ assertThat(seg1.compare(seg2, 1, 1, 2, 2)).isEqualTo(0);
+ assertThat(seg1.compare(seg2, 1, 1, 2, 1)).isGreaterThan(0);
}
@Test